首页 > 解决方案 > 如何等待进程池有一个空闲的工作人员

问题描述

我有一些繁重的任务要提交给 ProcessPoolExecutor。问题在于,由于执行者持有待处理的任务,它消耗了太多的内存。我想等待执行程序已满,阻塞主线程直到有一个工作人员空闲,所以我可以有固定数量的并行任务而不会累积待处理的繁重任务:

from concurrent.futures.process import ProcessPoolExecutor


def f(arg):
    print(f'Arg -> {arg}')
    # Heavy stuff


executor = ProcessPoolExecutor(max_workers=3)
for i in range(20):
    if executor.wait_some_free_worker():  # THIS is what I need
        executor.submit(f, i)

executor.shutdown(wait=True)

在上面的示例中,我只会并行执行 3 个任务,并且只有当有空闲的工作人员时才会继续下一个循环。

提前感谢,对我的英语感到抱歉

标签: pythonpython-3.xmultithreading

解决方案


用于concurrent.futures.wait等待期货完成,并在没有足够的未完成期货时提交新的期货:

from concurrent.futures import ProcessPoolExecutor , wait, FIRST_COMPLETED

def f(arg):
    print(f'Arg -> {arg}')

max_concurrent = 3  # how many futures to use at most
pending = set()     # currently running futures

with ProcessPoolExecutor(max_workers=3) as executor:
    for i in range(20):
        # wait until there are less-than-desired active futures
        while len(pending) >= max_concurrent:
            done, pending = wait(pending, return_when=FIRST_COMPLETED)
        pending.add(executor.submit(f, i))

这可以分解为一个可重用的辅助函数:

from concurrent.futures import Executor, wait, FIRST_COMPLETED, ALL_COMPLETED

def map_lazy(executor: Executor, func, *iterables, max_pending=10):
    pending = set()
    for args in zip(*iterables):
        while len(pending) >= max_pending:
            _, pending = wait(pending, return_when=FIRST_COMPLETED)
        pending.add(executor.submit(func, *args))
    wait(pending, return_when=ALL_COMPLETED)

推荐阅读