首页 > 解决方案 > 多处理管理器在 pool.apply_async 的非常简单的示例中失败

问题描述

我在与 python 相关的代码中看到了一些意外行为,尤其是multiprocessingManager类相关的行为。我写了一个超级简单的例子来尝试更好地理解发生了什么:

import multiprocessing as mp
from collections import defaultdict


def process(d):
    print('doing the process')
    d['a'] = []
    d['a'].append(1)
    d['a'].append(2)


def main():
    pool = mp.Pool(mp.cpu_count())
    with mp.Manager() as manager:
        d = manager.dict({'c': 2})
        result = pool.apply_async(process, args=(d))
        print(result.get())

        pool.close()
        pool.join()

        print(d)


if __name__ == '__main__':
    main()

这失败了,打印的堆栈跟踪result.get()如下:

multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))
  File "<string>", line 2, in __iter__
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 825, in _callmethod
    proxytype = self._manager._registry[token.typeid][-1]
AttributeError: 'NoneType' object has no attribute '_registry'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_test.py", line 34, in <module>
    main()
  File "mp_test.py", line 25, in main
    print(result.get())
  File "/usr/local/Cellar/python/3.7.5/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 657, in get
    raise self._value
AttributeError: 'NoneType' object has no attribute '_registry'

我仍然不清楚这里发生了什么。在我看来,这似乎是 Manager 类的一个非常非常简单的应用程序。它几乎是官方 python 文档中使用的实际示例的副本,唯一的区别是我使用的是池并使用 apply_async 运行进程。我这样做是因为这就是我在实际项目中使用的。

澄清一下,如果我没有result = and ,我就不会得到堆栈跟踪print(result.get())。我只是{'c': 2}在运行脚本时看到打印出来的,这向我表明出了点问题并且没有显示出来。

标签: pythonpython-3.xmultiprocessingpython-multiprocessing

解决方案


有几件事要开始:首先,这不是您运行的代码。您发布的代码有

  result = pool.apply_async(process2, args=(d))

但没有process2()定义。假设“过程”是有意的,接下来的事情是

args=(d)

部分。这和打字一样

args=d

但这不是所需要的。您需要传递一系列预期的参数。所以你需要把那部分改成

args=(d,) # build a 1-tuple

或者

args=[d]  # build a list

然后输出更改为

{'c': 2, 'a': []}

为什么 1 和 2 不在“a”列表中?因为只有 dict 本身存在于管理器服务器上。

d['a'].append(1)

首先从服务器获取“a”的映射,这是一个空列表。但是那个空列表不会以任何方式共享 - 它是本地的process()。您将 1 附加到它,然后将其丢弃 - 服务器对此一无所知。2也是一样。

要得到你想要的,你需要“做点什么”告诉管理服务器你改变了什么;例如,

d['a'] = L = []
L.append(1)
L.append(2)
d['a'] = L

推荐阅读