python - 使用 asyncio 进行多进程队列同步
问题描述
我想从asyncio
使用 Python 3.7 的兄弟进程中运行的循环中收集数据
理想情况下,我会使用multiprocess.JoinableQueue
, 中继其join()
同步调用。
但是,它的同步原语完全阻塞了事件循环(请参阅下面的部分答案以获取示例)。
说明性原型:
class MP_GatherDict(dict):
'''A per-process dictionary which can be gathered from a single one'''
def __init__(self):
self.q = multiprocess.JoinableQueue()
super().__init__()
async def worker_process_server(self):
while True:
(await?) self.q.put(dict(self)) # Put a shallow copy
(await?) self.q.join() # Wait for it to be gathered
async def gather(self):
all_dicts = []
while not self.q.empty():
all_dicts.append(await self.q.get())
self.q.task_done()
return all_dicts
请注意,put->get->join->put
流程可能无法按预期工作,但这个问题实际上是关于在事件循环中使用multiprocess
原语......asyncio
那么问题是如何从异步事件循环中最好await
地使用原语?multiprocess
解决方案
这个测试表明multiprocess.Queue.get()
阻塞了整个事件循环:
mp_q = mp.JoinableQueue()
async def mp_queue_wait():
try:
print('Queue:',mp_q.get(timeout=2))
except Exception as ex:
print('Queue:',repr(ex))
async def main_loop_task():
task = asyncio.get_running_loop().create_task(mp_queue_wait())
for i in range(3):
print(i, os.times())
await asyncio.sleep(1)
await task
print(repr(task))
asyncio.run(main_loop_task())
谁的输出是:
0 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208620.18)
Queue: Empty()
1 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208622.18)
2 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208623.18)
<Task finished coro=<mp_queue_wait() done,...> result=None>
因此,我将asyncio.loop.run_in_executor()视为下一个可能的答案,但是为此生成一个执行程序/线程似乎有点过头了……
这是使用默认执行程序的相同测试:
async def mp_queue_wait():
try:
result = await asyncio.get_running_loop().run_in_executor(None,mp_q.get,True,2)
except Exception as ex:
result = ex
print('Queue:',repr(result))
return result
和(期望的)结果:
0 posix.times_result(user=0.36, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210674.65)
1 posix.times_result(user=0.37, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210675.65)
Queue: Empty()
2 posix.times_result(user=0.37, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210676.66)
<Task finished coro=<mp_queue_wait() done, defined at /home/apozuelo/Documents/5G_SBA/Tera5G/services/db.py:211> result=Empty()>
推荐阅读
- c - 在 C 编程中,如果变量被声明为全局变量但随后被分配到 main 中,那么该变量存储在哪里?
- xamarin.forms - CollectionView 列表末尾的 KeepScrollOffset 行为
- ios - Swift:UIStackView 重叠所有视图而不是垂直堆叠它们
- java - Android Deeplink 两次打开应用
- python - print() 使用 pytesseract 清除控制台
- docker - Nextjs 环境变量始终未定义
- java - 如何从 sqlite 数据库添加到 for 循环
- ios - XCode 12.0.1 不适用于 iPhone SE 测试
- firebase - 参数不匹配的闭包调用:颤振中显示函数“[]”错误
- python - 根据值组合字典