首页 > 解决方案 > 使用 asyncio 进行多进程队列同步

问题描述

我想从asyncio使用 Python 3.7 的兄弟进程中运行的循环中收集数据

理想情况下,我会使用multiprocess.JoinableQueue, 中继其join()同步调用。

但是,它的同步原语完全阻塞了事件循环(请参阅下面的部分答案以获取示例)。

说明性原型:

class MP_GatherDict(dict):
    '''A per-process dictionary which can be gathered from a single one'''
    def __init__(self):
        self.q = multiprocess.JoinableQueue()
        super().__init__()

    async def worker_process_server(self):
        while True:
            (await?) self.q.put(dict(self)) # Put a shallow copy
            (await?) self.q.join() # Wait for it to be gathered

    async def gather(self):
        all_dicts = []
        while not self.q.empty():
            all_dicts.append(await self.q.get())
            self.q.task_done()
        return all_dicts

请注意,put->get->join->put流程可能无法按预期工作,但这个问题实际上是关于在事件循环中使用multiprocess原语......asyncio

那么问题是如何从异步事件循环中最好await地使用原语?multiprocess

标签: pythonpython-asynciopython-3.7

解决方案


这个测试表明multiprocess.Queue.get()阻塞了整个事件循环:

mp_q = mp.JoinableQueue()
async def mp_queue_wait():
    try:
        print('Queue:',mp_q.get(timeout=2))
    except Exception as ex:
        print('Queue:',repr(ex))

async def main_loop_task():
    task = asyncio.get_running_loop().create_task(mp_queue_wait())
    for i in range(3):
        print(i, os.times())
        await asyncio.sleep(1)
    await task
    print(repr(task))

asyncio.run(main_loop_task())

谁的输出是:

0 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208620.18)
Queue: Empty()
1 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208622.18)
2 posix.times_result(user=0.41, system=0.04, children_user=0.0, children_system=0.0, elapsed=17208623.18)
<Task finished coro=<mp_queue_wait() done,...> result=None>

因此,我将asyncio.loop.run_in_executor()视为下一个可能的答案,但是为此生成一个执行程序/线程似乎有点过头了……

这是使用默认执行程序的相同测试:

async def mp_queue_wait():
    try:
        result = await asyncio.get_running_loop().run_in_executor(None,mp_q.get,True,2)
    except Exception as ex:
        result = ex
    print('Queue:',repr(result))
    return result 

和(期望的)结果:

0 posix.times_result(user=0.36, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210674.65)
1 posix.times_result(user=0.37, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210675.65)
Queue: Empty()
2 posix.times_result(user=0.37, system=0.02, children_user=0.0, children_system=0.0, elapsed=17210676.66)
<Task finished coro=<mp_queue_wait() done, defined at /home/apozuelo/Documents/5G_SBA/Tera5G/services/db.py:211> result=Empty()>

推荐阅读