首页 > 解决方案 > 向 concurrent.futures.ProcessPool 中的所有进程提交代码以执行

问题描述

语境:

(是的,我知道importlib.reload警告

为了让它工作,我想我必须在进程池管理的importlib.reload每个multiprocessing进程中执行。

有没有办法向进程池中的所有进程提交一些东西?

标签: pythonparallel-processingmultiprocessingpython-multiprocessingprocess-pool

解决方案


我不知道这将如何与您提到的热重装尝试一起发挥作用,但是您真正提出的一般问题是可以回答的。

有没有办法向进程池中的所有进程提交一些东西?

这里的挑战在于确保所有进程都得到something一次且仅一次,并且在每个进程得到它之前不会进一步执行。

您可以借助multiprocessing.Barrier(parties[, action[, timeout]]). 屏障将阻止各方调用barrier.wait(),直到每一方都这样做,然后立即释放它们。

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor


def foo(x):
    for _ in range(int(42e4)):
        pass
    return x


def reload(something):
    print(f"{mp.current_process().name} --- reloading {something} and waiting.")
    barrier.wait()
    print(f"{mp.current_process().name} --- released.")


def init_barrier(barrier):
    globals()['barrier'] = barrier


if __name__ == '__main__':

    MAX_WORKERS = 4
    barrier = mp.Barrier(MAX_WORKERS)

    with ProcessPoolExecutor(
            MAX_WORKERS, initializer=init_barrier, initargs=(barrier,)
    ) as executor:
        print(list(executor.map(foo, range(10))))
        # then something for all processes
        futures = [executor.submit(reload, "something") for _ in range(MAX_WORKERS)]
        for f in futures:
            f.result()

        print(list(executor.map(foo, range(10))))

示例输出:

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
ForkProcess-3 --- reloading something and waiting.
ForkProcess-2 --- reloading something and waiting.
ForkProcess-1 --- reloading something and waiting.
ForkProcess-4 --- reloading something and waiting.
ForkProcess-1 --- released.
ForkProcess-4 --- released.
ForkProcess-3 --- released.
ForkProcess-2 --- released.
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Process finished with exit code 0

如果您可以保留全局变量barriermultiprocessing.get_context()._name返回值"fork",则不需要使用全局变量,initializer因为全局变量将被继承并通过分叉访问。


推荐阅读