首页 > 解决方案 > 在python中并行调用许多对象的方法

问题描述

我有两节课。一个叫algorithm,另一个叫Chain。在algorithm中,我创建了多个链,它们将是一系列采样值。我想在链级别并行运行采样。

换句话说,algorithm该类实例化了 n 个链,我想为类中的每个链并行运行_sample属于该类的方法。Chainalgorithm

下面是一个尝试我想做的示例代码。

我在这里看到了类似的问题:Apply a method to a list of objects in parallel using multi-processing,但如函数所示_sample_chains_parallel_worker,此方法不适用于我的情况(我猜是因为嵌套类结构)。

问题1:为什么这不适用于这种情况?

_ 中的方法sample_chains_parallel甚至不并行运行。

问题2:为什么?

问题 3:如何并行采样这些链中的每一个?

import time
import multiprocessing

class Chain():

    def __init__(self):
        self.thetas = []

    def _sample(self):
        for i in range(3):
            time.sleep(1)
            self.thetas.append(i)

    def clear_thetas(self):
        self.thetas = []

class algorithm():

    def __init__(self, n=3):
        self.n = n
        self.chains = []

    def _init_chains(self):
        for _ in range(self.n):
            self.chains.append(Chain())

    def _sample_chains(self):
        for chain in self.chains:
            chain.clear_thetas()
            chain._sample()

    def _sample_chains_parallel(self):
        pool = multiprocessing.Pool(processes=self.n)
        for chain in self.chains:
            chain.clear_thetas()
            pool.apply_async(chain._sample())
        pool.close()
        pool.join()

    def _sample_chains_parallel_worker(self):

        def worker(obj):
            obj._sample()

        pool = multiprocessing.Pool(processes=self.n)
        pool.map(worker, self.chains)

        pool.close()
        pool.join()


if __name__=="__main__":
    import time

    alg = algorithm()
    alg._init_chains()

    start = time.time()
    alg._sample_chains()
    end = time.time()
    print "sequential", end - start

    start = time.time()
    alg._sample_chains_parallel()
    end = time.time()
    print "parallel", end - start

    start = time.time()
    alg._sample_chains_parallel_worker()
    end = time.time()
    print "parallel, map and worker", end - start

标签: pythonclassmultiprocessing

解决方案


_sample_chains_parallel您调用chain._sample()而不是仅传递函数时:pool.apply_async(chain._sample()). 因此,您将结果作为参数传递,而不是让apply_async计算它。

但是删除()对你没有多大帮助,因为 Python 2 不能腌制实例方法(可能适用于 Python +3.5)。除非您调用结果对象,否则它不会引发错误,get()因此如果您看到这种方法的时间较短,请不要高兴,这是因为它会立即退出并出现未引发的异常。

对于并行版本,您必须重新定位worker到模块级别并pool.apply_async(worker (chain,))分别调用它pool.map(worker, self.chains)

请注意,您忘记clear_thetas()_sample_chains_parallel_worker. 无论如何,更好的解决方案是让 letChain._sample照顾 call self._clear_thetas()


推荐阅读