首页 > 解决方案 > 带有管理器和异步方法的 multiprocessing.pool

问题描述

我正在尝试使用 Manager() 在进程之间共享字典并尝试了以下代码:

from multiprocessing import Manager, Pool

def f(d):
    d['x'] += 2

if __name__ == '__main__':
    manager = Manager()
    d = manager.dict()
    d['x'] = 2
    p= Pool(4)

    for _ in range(2000):
        p.map_async(f, (d,))  #apply_async, map

    p.close()
    p.join()

    print (d)  # expects this result --> {'x': 4002}

使用 map_async 和 apply_async,打印的结果总是不同的(例如 {'x': 3838}, {'x': 3770})。但是,使用 map 会给出预期的结果。另外,我尝试使用 Process 而不是 Pool,结果也不同。

有什么见解吗?非阻塞部分和竞态条件不是由经理处理的吗?

标签: pythonpython-3.xmultiprocessingmultiprocessing-manager

解决方案


当您调用map(而不是map_async)时,它将阻塞,直到处理器完成您传递的所有请求,在您的情况下,这只是对 function 的一次调用f。因此,即使您的池大小为 4,您实际上也是一次处理 2000 个进程。要真正并行执行,您应该执行一次p.map(f, [d]*2000)而不是循环。

但是当您调用 时map_async,您不会阻塞并返回一个结果对象。get对结果对象的调用阻塞,直到进程完成,并将返回函数调用的结果。因此,现在您一次最多运行 4 个进程。但是字典的更新不会跨处理器序列化。我修改了代码以d[x] += 2通过使用多处理锁来强制序列化。您将看到结果现在是 4002。

from multiprocessing import Manager, Pool, Lock


def f(d):
    lock.acquire()
    d['x'] += 2
    lock.release()

def init(l):
    global lock
    lock = l

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        d['x'] = 2
        lock = Lock()
        p = Pool(4, initializer=init, initargs=(lock,)) # Create the multiprocessing lock that is sharable by all the processes

        results = [] # if the function returnd a result we wanted
        for _ in range(2000):
            results.append(p.map_async(f, (d,)))  #apply_async, map
        """
        for i in range(2000): # if the function returned a result we wanted
            results[i].get() # wait for everything to finish
        """
        p.close()
        p.join()
        print(d)

推荐阅读