首页 > 解决方案 > Pool / starmap的Python多处理行为

问题描述

我有一个程序使用多处理库来计算一些东西。大约有 10K 输入要计算,每个输入需要 0.2 秒到 10 秒。

我目前的方法使用一个池:

# Inputs
signals = [list(s) for s in itertools.combinations_with_replacement(possible_inputs, 3)]

# Compute
with mp.Pool(processes = N) as p:
    p.starmap(compute_solutions, [(s, t0, tf, folder) for s in signals])
    print ("    | Computation done.")

我注意到在 300 / 400 最后输入检查时,程序变得慢了很多。我的问题是:Pool和 的starmap()行为如何?

根据我的观察,我相信如果我有 10K 输入和N = 4(4 个进程),那么 2 500 个第一个输入被分配给第一个进程,2 500 个紧挨着第二个,......并且每个进程都将其输入分配给连载时尚。这意味着如果某些进程先于其他进程清除了队列,它们就不会执行新任务。

这个对吗?

如果这是正确的,我怎样才能拥有一个可以用这个伪代码表示的更智能的系统:

workers = Initialize N workers
tasks = A list of the tasks to perform

for task in tasks:
    if a worker is free:
        submit task to this worker
    else:
        wait

谢谢您的帮助 :)

注意:不同的地图功能有什么区别。我相信map(), imap_unordered(), imap,starmap存在。

它们之间有什么区别,我们什么时候应该使用其中一种?

标签: pythonmultiprocessing

解决方案


这意味着如果某些进程先于其他进程清除了队列,它们就不会执行新任务。

这个对吗?

不。 的主要目的multiprocess.Pool()是将传递的工作负载分散到其工作人员池中——这就是为什么它带有所有这些映射选项的原因——其各种方法之间的唯一区别在于工作负载的实际分配方式以及结果返回的方式集。

在您的情况下,您生成的可迭代[(s, t0, tf, folder) for s in signals]对象会将其每个元素(最终取决于signals大小)发送到compute_solutions(s, t0, tf, folder)池中的下一个空闲工作人员(调用为 ),一次一个(或者如果chunksize传递参数则更多),直到整个迭代被耗尽。您无法控制哪个工人执行哪个部分。

工作量也可能不会均匀分布 - 一个工作人员可能会根据资源使用情况、执行速度、各种内部事件处理比另一个工作更多的条目......

然而,使用map,imap和你的starmap方法multiprocessing.Pool会得到均匀有序分布的错觉,因为它们在内部同步每个工作人员的返回以匹配可迭代(即结果的第一个元素将包含来自被调用函数的结果返回)可迭代的第一个元素)。如果你想看看下面实际发生了什么,你可以尝试这些方法的异步/无序版本。

因此,默认情况下您会获得更智能的系统multiprocessing.Pool.apply_async(),但如果您想完全控制您的工人池,您可以随时使用。

附带说明一下,如果您正在考虑优化对可迭代对象本身的访问(因为池映射选项将消耗其中的很大一部分),您可以查看此答案

最后,

它们之间有什么区别,我们什么时候应该使用其中一种?

不要在这里引用我,而是转到官方文档,因为对它们之间的区别有很好的解释。


推荐阅读