首页 > 解决方案 > 将 asyncio 与多工人 ProcessPoolExecutor 相结合

问题描述

是否可以采用阻塞功能,例如work并让它在一个ProcessPoolExecutor有多个工作人员的情况下同时运行?

import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor

num_jobs = 4
queue = asyncio.Queue()
executor = ProcessPoolExecutor(max_workers=num_jobs)
loop = asyncio.get_event_loop()

def work():
    sleep(1)

async def producer():
    for i in range(num_jobs):
        results = await loop.run_in_executor(executor, work)
        await queue.put(results)

async def consumer():
    completed = 0
    while completed < num_jobs:
        job = await queue.get()
        completed += 1

s = time()
loop.run_until_complete(asyncio.gather(producer(), consumer()))
print("duration", time() - s)

在超过 4 个内核的机器上运行上述代码需要大约 4 秒。您将如何编写producer以使上述示例仅花费约 1 秒?

标签: pythonpython-asyncio

解决方案


await loop.run_in_executor(executor, work)阻塞循环直到work完成,因此您一次只能运行一个函数。

要同时运行作业,您可以使用asyncio.as_completed

async def producer():
    tasks = [loop.run_in_executor(executor, work) for _ in range(num_jobs)]
    for f in asyncio.as_completed(tasks, loop=loop):
        results = await f
        await queue.put(results)

推荐阅读