首页 > 解决方案 > 为什么我不能在 python 多处理中关闭池之前使用 join()

问题描述

我有一个类,它有一个方法可以进行一些并行计算并且经常被调用。因此,我希望我的池在类的构造函数中初始化一次,而不是在每次调用此方法时都创建一个新池。在这种方法中,我想使用 apply_async() 为所有工作进程启动一个任务,然后等待(阻塞)并聚合每个任务的结果。我的代码如下所示:

class Foo:
     def __init__(self, ...):
         # ...
         self.pool = mp.Pool(mp.cpu_count())

     def do_parallel_calculations(self, ...):
         for _ in range(mp.cpu_count()):
              self.pool.apply_async(calc_func, args=(...), callback=aggregate_result)
         
         # wait for results to be aggregated to a global var by the callback
         self.pool.join()  # <-- ValueError: Pool is still running
         
         # do something with the aggregated result of all worker processes

但是,当我运行它时,我在 self.pool.join() 中收到一个错误,上面写着:“ValueError:Pool is still running”。现在,在我看到的所有示例中, self.pool.close() 在 self.pool.join() 之前被调用,我认为这就是我收到此错误的原因,但我不想关闭我的池,因为我希望它在那里下次调用此方法!我不能不使用 self.pool.join(),因为我需要一种方法来等待所有进程完成,并且我不想浪费地手动旋转,例如使用“while not global_flag: pass”。

我能做些什么来实现我想要做的事情?为什么不让多处理让我加入一个仍然开放的池?这似乎是一件非常合理的事情。

标签: pythonpython-multiprocessing

解决方案


Let's make this concrete with a real example:

import multiprocessing as mp


def calc_func(x):
    return x * x


class Foo:
    def __init__(self):
        self.pool = mp.Pool(mp.cpu_count())

    def do_parallel_calculations(self, values):
        results = []
        for value in values:
            results.append(self.pool.apply_async(calc_func, args=(value,)))
        for result in results:
            print(result.get())

if __name__ == '__main__':
    foo = Foo()
    foo.do_parallel_calculations([1,2,3])

推荐阅读