首页 > 解决方案 > 在进程之间共享异步对象

问题描述

我正在使用 theasynciomultiprocessinglibrary 来运行两个进程,每个进程都有一个服务器实例在不同的端口上侦听传入消息。

为了识别每个客户端,我想dict在两个进程之间共享一个来更新已知客户端的列表。为了实现这一点,我决定使用一个查找键,它为此连接Tuple[StreamReader, StreamWriter]分配了一个对象。Client

但是,只要我插入或简单地访问共享字典,程序就会崩溃并显示以下错误消息:

Task exception was never retrieved
future: <Task finished name='Task-5' coro=<GossipServer.handle_client() done, defined at /home/croemheld/Documents/network/server.py:119> exception=AttributeError("Can't pickle local object 'WeakSet.__init__.<locals>._remove'")>
Traceback (most recent call last):
  File "/home/croemheld/Documents/network/server.py", line 128, in handle_client
    if not await self.handle_message(reader, writer, buffer):
  File "/home/croemheld/Documents/network/server.py", line 160, in handle_message
    client = self.syncmanager.get_api_client((reader, writer))
  File "<string>", line 2, in get_api_client
  File "/usr/lib/python3.9/multiprocessing/managers.py", line 808, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.9/multiprocessing/connection.py", line 211, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'

我自然查了一下错误信息,发现了这个问题,但我真的不明白这是什么原因。据我了解,此崩溃的原因是StreamReader并且StreamWriter不能被腌制/序列化以便在进程之间共享。如果这实际上是原因,有没有办法腌制它们,也许通过修补减速器功能来代替使用不同的腌制器?

标签: pythonpython-3.xpicklepython-multiprocessingmultiprocessing-manager

解决方案


您可能对使用SyncManager感兴趣。只要确保在最后调用关闭管理器,shutdown就不会留下僵尸进程。

from multiprocessing.managers import SyncManager
from multiprocessing import Process
import signal

my_manager = SyncManager()

# to avoid closing the manager by ctrl+C. be sure to handle KeyboardInterrupt errors and close the manager accordingly
def manager_init():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

my_manager.start(manager_init)

my_dict = my_manager.dict()
my_dict["clients"] = my_manager.list()
def my_process(my_id, the_dict):
    for i in range(3):
        the_dict["clients"].append(f"{my_id}_{i}")

processes = []
for j in range(4):
    processes.append(Process(target=my_process, args=(j,my_dict)))

for p in processes:
    p.start()

for p in processes:
    p.join()

print(my_dict["clients"])
# ['0_0', '2_0', '0_1', '3_0', '1_0', '0_2', '1_1', '2_1', '3_1', '1_2', '2_2', '3_2']

my_manager.shutdown()




推荐阅读