python - 如何等待进程池有一个空闲的工作人员
问题描述
我有一些繁重的任务要提交给 ProcessPoolExecutor。问题在于,由于执行者持有待处理的任务,它消耗了太多的内存。我想等待执行程序已满,阻塞主线程直到有一个工作人员空闲,所以我可以有固定数量的并行任务而不会累积待处理的繁重任务:
from concurrent.futures.process import ProcessPoolExecutor
def f(arg):
print(f'Arg -> {arg}')
# Heavy stuff
executor = ProcessPoolExecutor(max_workers=3)
for i in range(20):
if executor.wait_some_free_worker(): # THIS is what I need
executor.submit(f, i)
executor.shutdown(wait=True)
在上面的示例中,我只会并行执行 3 个任务,并且只有当有空闲的工作人员时才会继续下一个循环。
提前感谢,对我的英语感到抱歉
解决方案
用于concurrent.futures.wait
等待期货完成,并在没有足够的未完成期货时提交新的期货:
from concurrent.futures import ProcessPoolExecutor , wait, FIRST_COMPLETED
def f(arg):
print(f'Arg -> {arg}')
max_concurrent = 3 # how many futures to use at most
pending = set() # currently running futures
with ProcessPoolExecutor(max_workers=3) as executor:
for i in range(20):
# wait until there are less-than-desired active futures
while len(pending) >= max_concurrent:
done, pending = wait(pending, return_when=FIRST_COMPLETED)
pending.add(executor.submit(f, i))
这可以分解为一个可重用的辅助函数:
from concurrent.futures import Executor, wait, FIRST_COMPLETED, ALL_COMPLETED
def map_lazy(executor: Executor, func, *iterables, max_pending=10):
pending = set()
for args in zip(*iterables):
while len(pending) >= max_pending:
_, pending = wait(pending, return_when=FIRST_COMPLETED)
pending.add(executor.submit(func, *args))
wait(pending, return_when=ALL_COMPLETED)
推荐阅读
- python - 会话中准备好的请求中的标头
- python - 将日期(字符串)转换为日期时间格式
- excel - FDSAuditLink Excel 问题?
- php - PHP preg_split,在 | 处拆分 并检查至少 3 个字符
- sql - 如何对列中的所有值求和,不包括 Oracle 中的字符串值?
- javascript - 对象解构 eslint 抛出 react/prop-types
- android - 在列表中
users = [] ------ 未处理的异常:类型 '(dynamic) => Null' 不是类型 '(String, dynamic) => void' of 'f' 的子类型 - java - 使用 WebClient 的 Spring Cloud Function AWS Lambda
- c++ - 如何将任何大小的位集拆分为 64 位位集列表?
- extjs - Tagfield ExtJS 中的排序标签