python-3.x - Python:manager.Queue() 与 asyncio。如何解决死锁?
问题描述
我试图弄清楚如何让基于 websocket 的服务器监听传入的请求,将它们放在队列中以供另一个进程工作,然后将结果放在另一个队列中,基于 websocket 的服务器可以等待所述结果并发送回复客户端。
这只是我试图学习并获得更多关于异步和进程之间共享数据的经验。我正在使用 Python 3.9.2 64 位。
现在我陷入了死锁,正如服务器代码中的“producer_handler”函数中所评论的那样。这是我正在玩的代码:
服务器:
import asyncio
import logging
import time
from multiprocessing import Manager, Process
import websockets
logging.root.setLevel(0)
def server(recievequeue, sendqueue):
async def consumer_handler(websocket, path):
while True:
logging.info('Waiting for request')
try:
request = await websocket.recv()
except Exception as exception:
logging.warning(f'consumer_handler Error: {exception}')
break
logging.info(f'Request: {request}')
recievequeue.put(request)
logging.info('Request placed in recievequeue')
async def producer_handler(websocket, path):
while True:
logging.info('Waiting for response')
response = sendqueue.get()# Deadlock is here.
try:
await websocket.send(response)
except Exception as exception:
logging.warning(f'producer_handler Error: {exception}')
break
logging.info('Response sent')
async def handler(websocket, path):
consumer_task = asyncio.ensure_future(consumer_handler(websocket, path))
producer_task = asyncio.ensure_future(producer_handler(websocket, path))
done, pending = await asyncio.wait([producer_task, consumer_task], return_when=asyncio.FIRST_COMPLETED)
for task in done:
logging.info(f'Canceling: {task}')
task.cancel()
for task in pending:
logging.info(f'Canceling: {task}')
task.cancel()
eventloop = asyncio.get_event_loop()
eventloop.run_until_complete(websockets.serve(handler, 'localhost', 8081, ssl=None))
eventloop.run_forever()
def message_handler(recievequeue, sendqueue):
while True:
# I just want to test getting a message from the recievequeue, and placing it in the sendqueue
request = recievequeue.get()
logging.info(f'Request: {request}')
time.sleep(3)
data = str(time.time())
logging.info(f'Work completed @ {data}')
sendqueue.put(data)
def main():
logging.info('Starting Application')
manager = Manager()
sendqueue = manager.Queue()
recievequeue = manager.Queue()
test_process_1 = Process(target=server, args=(recievequeue, sendqueue), name='Server')
test_process_1.start()
test_process_2 = Process(target=message_handler, args=(recievequeue, sendqueue), name='Message Handler')
test_process_2.start()
test_process_1.join()
if __name__ == '__main__':
main()
和客户:
import asyncio
import logging
import websockets
logging.root.setLevel(0)
URI = "wss://localhost:8081"
async def test():
async def consumer_handler(connection):
while True:
try:
request = await connection.recv()
except Exception as exception:
logging.warning(f'Error: {exception}')
break
logging.info(request)
async def producer_handler(connection):
while True:
await asyncio.sleep(5)
try:
await connection.send('Hello World')
except Exception as exception:
logging.warning(f'Error: {exception}')
break
async with websockets.connect(URI, ssl=None) as connection:
consumer_task = asyncio.ensure_future(consumer_handler(connection))
producer_task = asyncio.ensure_future(producer_handler(connection))
while True:
await asyncio.wait([consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
def main():
logging.info('Starting Application')
eventloop = asyncio.get_event_loop()
try:
eventloop.run_until_complete(test())
eventloop.run_forever()
except Exception as exception:
logging.warning(f'Error: {exception}')
if __name__ == '__main__':
main()
如果我删除队列,服务器和多个客户端可以毫无问题地来回交谈。我只是不知道如何获取()和放置()请求和响应。任何帮助,将不胜感激!
解决方案
因此,在浏览其他帖子后,我注意到其他人在谈论死锁和使用run_in_executor
. 经过更多测试后,我发现用以下代码替换导致死锁的行解决了该问题:
response = await eventloop.run_in_executor(None, sendqueue.get)
推荐阅读
- kubernetes - Prometheus 如何发现 Kubernetes 作业?
- python - 反转列表元素并存储为字典列表
- vue.js - 我可以在 v-data-table 的标题中使用 v-select
- vue.js - 在 vuetify 树视图中加载 vuetify 数据表
- oauth-2.0 - 在 MS Outlook 与 WSO2 EI 集成中应用注册访问被拒绝
- python - 给定一个包含多个单词和数字的句子。找出其中不包含 9 的最大数
- mongodb - 如何删除单个帖子上的答案
- c++ - 在 C++ 中使用 EdDSA/Ed25519 算法创建 JWT
- java - 从java中的给定范围生成加拿大邮政编码
- sql-server - 如何在 Angular 9 中有效加载 jquery 数据表,其中包含来自 asp.net core 和 mssql 的 1000 万条记录?