首页 > 解决方案 > 如何使用多处理包在 python 中并行化 for 循环?

问题描述

注意:我不需要进程/线程之间的任何通信,我只对完成信号感兴趣(这就是我将这个问题作为一个新问题发布的原因,因为我发现的所有其他示例都相互通信)。

如何multiprocessing在 Python 3 中使用包来并行化以下代码(最终目标是让它运行得更快):

a = 123
b = 456
for id in ids: # len(ids) = 10'000
   # executes a binary with CLI flags
   run_binary_with_id(id, a, b) 
   # i.e. runs "./hello_world_exec --id id --a a --b b" which takes about 30 seconds on average

我尝试了以下方法:

import multiprocessing as mp

def run_binary_with_id(id, a, b):
    run_command('./hello_world_exec --id {} --a {} --b {}'.format(id, a, b))

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    a = 123
    b = 456
    ids = range(10000)
    for id in ids:
       p = ctx.Process(target=run_binary_with_id, args=(id,a,b))
       p.start()
    p.join()
    # The binary was executed len(ids) number of times, do other stuff assuming everything's completed at this point

或者

for id in ids:
   map.apply_async(run_binary_with_id, (id,a,b))

类似的问题中,答案如下:

def consume(iterator):
    deque(iterator, max_len=0)
x=pool.imap_unordered(f,((i,j) for i in range(10000) for j in range(10000)))
consume(x)

我根本不明白(为什么我需要这个consume())。

标签: python-3.xpython-multiprocessingpython-multithreading

解决方案


尝试生成 10000 个并行运行的进程几乎肯定会导致系统过载并使其运行速度比顺序运行进程慢,因为当进程数量远远超过时,操作系统必须不断执行进程之间的上下文切换所涉及的开销您的系统拥有的 CPU/内核数。

您可以改为使用multiprocessing.Pool来限制为任务生成的工作进程的数量。默认情况下Pool,构造函数将进程数限制为系统具有的核心数,但如果您愿意,可以使用processes参数对其进行微调。然后,您可以使用它的map方法轻松映射一系列参数以应用于给定函数以并行运行。但是,它只能将一个参数映射到函数,因此您必须使用functools.partial为其他参数提供默认值,在您的情况下,这些参数在调用之间不会更改:

from functools import partial
if __name__ == '__main__':
    _run_binary_with_id = partial(run_binary_with_id, a=123, b=456)
    with mp.Pool() as pool:
        pool.map(_run_binary_with_id, range(10000))

推荐阅读