python-3.x - Python 多处理管理器和代理;为什么我的自定义类不通过网络共享?
问题描述
所以我试图通过网络共享对自定义类对象的访问,方法是让主机为其提供服务,并且多个客户端连接并更新它。似乎标准多处理库仅针对此用例具有管理器和代理。
我相信通过阅读多处理文档(以及 此处的其他 帖子 ),在主机端我需要一个从多处理 BaseManager 类派生的自定义管理器,然后使用从多处理 BaseProxy 类派生的自定义代理注册我的自定义类。我相信在客户端我需要连接到管理器并使用代理与我的自定义类进行交互。
我遇到的问题是在客户端上看不到在我的自定义类的主机版本上更改的属性,反之亦然。看来我在实现上做错了或误解了文档/源代码。有人可以帮助提供一个工作示例并解释解决方案吗?
这是我的最小示例。
在一个终端中运行管理器:
>> python -i manager.py
from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'
# shared class
class Foo:
"""my custom shared class"""
def __init__(self):
self.x = None
def get_x(self):
return self.x
def set_x(self, value):
self.x = value
# proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))
class FooProxy(FooProxyBase):
"""shared class proxy"""
def get_x(self):
return self._callmethod('get_x')
def set_x(self, value):
return self._callmethod('set_x', (value,))
# custom shared class manager
class MyBaseManager(BaseManager):
pass
# manager, run manager server for shared class and set data
print(f'manager running on proc {os.getpid()}')
MyBaseManager.register("Foo", Foo, FooProxy)
m = MyBaseManager(address=(IP, PORT), authkey=KEY)
m.start()
print(f'manager server running on proc {m._process.pid}')
print(f'(proxy) {str(m)}')
print(f'(referant) {repr(m)}')
# get copy of managed class proxy and set value to 10
f = m.Foo()
print(f'x={f.get_x()} => should be None')
f.set_x(10) # set x value to 10
print(f'x={f.get_x()} => should be 10')
在其他终端中运行客户端
>> python -i client.py
from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'
# proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))
class FooProxy(FooProxyBase):
"""shared class proxy"""
def get_x(self):
return self._callmethod('get_x')
def set_x(self, value):
return self._callmethod('set_x', (value,))
# custom shared class manager
class MyBaseManager(BaseManager):
pass
# client, connect to manager server and get data from shared class
print(f'client running on proc {os.getpid()}')
MyBaseManager.register("Foo", None, FooProxy)
m = MyBaseManager(address=(IP, PORT), authkey=KEY)
m.connect()
print(f'(proxy) {str(m)}')
print(f'(referant) {repr(m)}')
# get copy of managed class proxy and get value, should be 10
f = m.Foo()
print(f'x={f.get_x()} => should be 10')
您将看到客户端在 x=None 应该为 10 时返回。我没有收到明显的异常,并且在 Ubuntu 18 和 OSX 上进行了尝试并获得了相同的行为。
请注意,我仅限于 python 3.6 并在我的生产系统上使用标准库。我还成功地尝试了多处理文档中的其他示例,因此请相信这不是计算机/网络问题。
谢谢
解决方案
客户端有一个不同的共享 Foo 实例。您可以将该实例传递给其他进程并查看它是否正常工作。下面我将它传递给另外两个进程:一个打印当前值并更改它,另一个等待片刻,然后打印第一个工作人员更新的值:
import multiprocessing as mp
from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
import os
import time
IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'
# proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))
class FooProxy(FooProxyBase):
"""shared class proxy"""
def get_x(self):
return self._callmethod('get_x')
def set_x(self, value):
return self._callmethod('set_x', (value,))
# custom shared class manager
class MyBaseManager(BaseManager):
pass
def worker1(f):
print(f'worker pid={os.getpid()} x={f.get_x()}')
f.set_x(5)
def worker2(f):
time.sleep(1)
print(f'worker pid={os.getpid()} x={f.get_x()}')
if __name__ == '__main__':
# client, connect to manager server and get data from shared class
print(f'client running on proc {os.getpid()}')
MyBaseManager.register("Foo", None, FooProxy)
m = MyBaseManager(address=(IP, PORT), authkey=KEY)
m.connect()
print(f'(proxy) {str(m)}')
print(f'(referant) {repr(m)}')
# get copy of managed class proxy and get value, should be 10
f = m.Foo()
f.set_x(7)
print(f'client x={f.get_x()}')
mp.Process(target=worker1,args=(f,)).start()
mp.Process(target=worker2,args=(f,)).start()
输出py -i client.py
:
client running on proc 11332
(proxy) <__main__.MyBaseManager object at 0x000001EB9AC0AFA0>
(referant) <__main__.MyBaseManager object at 0x000001EB9AC0AFA0>
client x=7
>>> worker pid=2560 x=7
worker pid=11444 x=5
如果你想要一个单例 Foo,你可以公开一个get_foo
获取通用全局 Foo 的函数:
经理.py:
from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
import os
IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'
# shared class
class Foo:
"""my custom shared class"""
def __init__(self):
self.x = None
def get_x(self):
return self.x
def set_x(self, value):
self.x = value
# proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))
class FooProxy(FooProxyBase):
"""shared class proxy"""
def get_x(self):
return self._callmethod('get_x')
def set_x(self, value):
return self._callmethod('set_x', (value,))
## Global Foo and function to get the instance.
global_foo = Foo()
def get_foo():
return global_foo
# custom shared class manager
class MyBaseManager(BaseManager):
pass
if __name__ == '__main__':
# manager, run manager server for shared class and set data
print(f'manager running on proc {os.getpid()}')
MyBaseManager.register("Foo", Foo, FooProxy)
MyBaseManager.register("get_foo", get_foo)
m = MyBaseManager(address=(IP, PORT), authkey=KEY)
m.start()
print(f'manager server running on proc {m._process.pid}')
print(f'(proxy) {str(m)}')
print(f'(referant) {repr(m)}')
# get global instance and set value to 10
f = m.get_foo()
print(f'x={f.get_x()} => should be None')
f.set_x(10) # set x value to 10
print(f'x={f.get_x()} => should be 10')
客户端.py:
from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
import os
IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'
# proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))
class FooProxy(FooProxyBase):
"""shared class proxy"""
def get_x(self):
return self._callmethod('get_x')
def set_x(self, value):
return self._callmethod('set_x', (value,))
# custom shared class manager
class MyBaseManager(BaseManager):
pass
# client, connect to manager server and get data from shared class
print(f'client running on proc {os.getpid()}')
MyBaseManager.register("Foo", None, FooProxy)
MyBaseManager.register("get_foo")
m = MyBaseManager(address=(IP, PORT), authkey=KEY)
m.connect()
print(f'(proxy) {str(m)}')
print(f'(referant) {repr(m)}')
# get global instance and get value, should be 10
f = m.get_foo()
print(f'x={f.get_x()} => should be 10')
客户端输出:
client running on proc 7180
(proxy) <__main__.MyBaseManager object at 0x000001A3F0267610>
(referant) <__main__.MyBaseManager object at 0x000001A3F0267610>
x=10 => should be 10
推荐阅读
- python - 通过不同目录多次运行python脚本的最佳方法?使用视窗
- javascript - 循环内循环 JS
- python - Pandas - 计算另一个日期之间的日期
- azure-maps - 需要来自 /address/structured OR 可靠过滤器的完全匹配
- sql - 如何识别具有每行最大值的列
- terraform - 如何销毁 Terraform 中的资源?
- r - 如何在另一个栅格具有 NA 的情况下将栅格值设置为“NA”
- python - 将 Pandas df 列的字符串列表转换为整数
- python - 外推(和内插)Pandas DataFrame
- r - 将 R 公式与 dplyr 一起使用