python - 将 asyncio 与多工人 ProcessPoolExecutor 相结合
问题描述
是否可以采用阻塞功能,例如work
并让它在一个ProcessPoolExecutor
有多个工作人员的情况下同时运行?
import asyncio
from time import sleep, time
from concurrent.futures import ProcessPoolExecutor
num_jobs = 4
queue = asyncio.Queue()
executor = ProcessPoolExecutor(max_workers=num_jobs)
loop = asyncio.get_event_loop()
def work():
sleep(1)
async def producer():
for i in range(num_jobs):
results = await loop.run_in_executor(executor, work)
await queue.put(results)
async def consumer():
completed = 0
while completed < num_jobs:
job = await queue.get()
completed += 1
s = time()
loop.run_until_complete(asyncio.gather(producer(), consumer()))
print("duration", time() - s)
在超过 4 个内核的机器上运行上述代码需要大约 4 秒。您将如何编写producer
以使上述示例仅花费约 1 秒?
解决方案
await loop.run_in_executor(executor, work)
阻塞循环直到work
完成,因此您一次只能运行一个函数。
要同时运行作业,您可以使用asyncio.as_completed
:
async def producer():
tasks = [loop.run_in_executor(executor, work) for _ in range(num_jobs)]
for f in asyncio.as_completed(tasks, loop=loop):
results = await f
await queue.put(results)
推荐阅读
- jira - 如何通过只有父页面标题的 Confluence REST API 创建子页面
- javascript - 筹码值得到 [object Object]
- python - 我们如何使列表中的元素在python中多次倒退?
- django - 我在哪里可以在 django 中存储刷新和访问令牌
- python-3.x - 403 客户端错误:禁止访问 url:https://ropsten.infura.io/v3/PROJECT_ID
- java - 如何解决这个保证金差距?
- kernel - (内核)关于 OpenBSD 的开发人员级别文档
- html - 在 Angular 中以编程方式转换(旋转)元素
- javascript - 如何在反应传单上添加比例?
- github - 如何为 github 操作设置环境变量?