首页 > 解决方案 > Python 多处理管理器和代理;为什么我的自定义类不通过网络共享?

问题描述

所以我试图通过网络共享对自定义类对象的访问,方法是让主机为其提供服务,并且多个客户端连接并更新它。似乎标准多处理库仅针对此用例具有管理器和代理。

我相信通过阅读多处理文档以及 此处的其他 帖子 ),在主机端我需要一个从多处理 BaseManager 类派生的自定义管理器,然后使用从多处理 BaseProxy 类派生的自定义代理注册我的自定义类。我相信在客户端我需要连接到管理器并使用代理与我的自定义类进行交互。

我遇到的问题是在客户端上看不到在我的自定义类的主机版本上更改的属性,反之亦然。看来我在实现上做错了或误解了文档/源代码。有人可以帮助提供一个工作示例并解释解决方案吗?

这是我的最小示例。

在一个终端中运行管理器:

>> python -i manager.py

from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType

IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'

# shared class
class Foo:
    """my custom shared class"""
    def __init__(self):
        self.x = None

    def get_x(self):
        return self.x

    def set_x(self, value):
        self.x = value

#  proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))

class FooProxy(FooProxyBase):
    """shared class proxy"""

    def get_x(self):
        return self._callmethod('get_x')

    def set_x(self, value):
        return self._callmethod('set_x', (value,))

# custom shared class manager
class MyBaseManager(BaseManager):
    pass

# manager, run manager server for shared class and set data
print(f'manager running on proc {os.getpid()}')

MyBaseManager.register("Foo", Foo, FooProxy)
m = MyBaseManager(address=(IP, PORT), authkey=KEY)
m.start()

print(f'manager server running on proc {m._process.pid}')
print(f'(proxy) {str(m)}')
print(f'(referant) {repr(m)}')

# get copy of managed class proxy and set value to 10
f = m.Foo()
print(f'x={f.get_x()} => should be None')
f.set_x(10) # set x value to 10
print(f'x={f.get_x()} => should be 10')

在其他终端中运行客户端

 >> python -i client.py
from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType

IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'

#  proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))

class FooProxy(FooProxyBase):
    """shared class proxy"""

    def get_x(self):
        return self._callmethod('get_x')

    def set_x(self, value):
        return self._callmethod('set_x', (value,))

# custom shared class manager
class MyBaseManager(BaseManager):
    pass

# client, connect to manager server and get data from shared class
print(f'client running on proc {os.getpid()}')

MyBaseManager.register("Foo", None, FooProxy)
m = MyBaseManager(address=(IP, PORT), authkey=KEY)
m.connect()

print(f'(proxy) {str(m)}')
print(f'(referant) {repr(m)}')

# get copy of managed class proxy and get value, should be 10
f = m.Foo()
print(f'x={f.get_x()} => should be 10')

您将看到客户端在 x=None 应该为 10 时返回。我没有收到明显的异常,并且在 Ubuntu 18 和 OSX 上进行了尝试并获得了相同的行为。

请注意,我仅限于 python 3.6 并在我的生产系统上使用标准库。我还成功地尝试了多处理文档中的其他示例,因此请相信这不是计算机/网络问题。

谢谢

标签: python-3.xmultiprocessingpython-multiprocessing

解决方案


客户端有一个不同的共享 Foo 实例。您可以将该实例传递给其他进程并查看它是否正常工作。下面我将它传递给另外两个进程:一个打印当前值并更改它,另一个等待片刻,然后打印第一个工作人员更新的值:

import multiprocessing as mp
from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
import os
import time

IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'

#  proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))

class FooProxy(FooProxyBase):
    """shared class proxy"""

    def get_x(self):
        return self._callmethod('get_x')

    def set_x(self, value):
        return self._callmethod('set_x', (value,))

# custom shared class manager
class MyBaseManager(BaseManager):
    pass

def worker1(f):
    print(f'worker pid={os.getpid()} x={f.get_x()}')
    f.set_x(5)

def worker2(f):
    time.sleep(1)
    print(f'worker pid={os.getpid()} x={f.get_x()}')

if __name__ == '__main__':
    # client, connect to manager server and get data from shared class
    print(f'client running on proc {os.getpid()}')

    MyBaseManager.register("Foo", None, FooProxy)
    m = MyBaseManager(address=(IP, PORT), authkey=KEY)
    m.connect()

    print(f'(proxy) {str(m)}')
    print(f'(referant) {repr(m)}')

    # get copy of managed class proxy and get value, should be 10
    f = m.Foo()
    f.set_x(7)
    print(f'client x={f.get_x()}')
    mp.Process(target=worker1,args=(f,)).start()
    mp.Process(target=worker2,args=(f,)).start()

输出py -i client.py

client running on proc 11332
(proxy) <__main__.MyBaseManager object at 0x000001EB9AC0AFA0>
(referant) <__main__.MyBaseManager object at 0x000001EB9AC0AFA0>
client x=7
>>> worker pid=2560 x=7
worker pid=11444 x=5

如果你想要一个单例 Foo,你可以公开一个get_foo获取通用全局 Foo 的函数:

经理.py:

from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
import os

IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'

# shared class
class Foo:
    """my custom shared class"""
    def __init__(self):
        self.x = None

    def get_x(self):
        return self.x

    def set_x(self, value):
        self.x = value

#  proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))

class FooProxy(FooProxyBase):
    """shared class proxy"""

    def get_x(self):
        return self._callmethod('get_x')

    def set_x(self, value):
        return self._callmethod('set_x', (value,))

##  Global Foo and function to get the instance.
global_foo = Foo()

def get_foo():
    return global_foo

# custom shared class manager
class MyBaseManager(BaseManager):
    pass

if __name__ == '__main__':
    # manager, run manager server for shared class and set data
    print(f'manager running on proc {os.getpid()}')

    MyBaseManager.register("Foo", Foo, FooProxy)
    MyBaseManager.register("get_foo", get_foo)
    m = MyBaseManager(address=(IP, PORT), authkey=KEY)
    m.start()

    print(f'manager server running on proc {m._process.pid}')
    print(f'(proxy) {str(m)}')
    print(f'(referant) {repr(m)}')

    # get global instance and set value to 10
    f = m.get_foo()
    print(f'x={f.get_x()} => should be None')
    f.set_x(10) # set x value to 10
    print(f'x={f.get_x()} => should be 10')

客户端.py:

from multiprocessing.managers import BaseManager, BaseProxy, MakeProxyType
import os

IP = 'localhost'
PORT = 50000
KEY = b'abracadabra'

#  proxy class
FooProxyBase = MakeProxyType('FooProxyBase', ('get_x', 'set_x'))

class FooProxy(FooProxyBase):
    """shared class proxy"""

    def get_x(self):
        return self._callmethod('get_x')

    def set_x(self, value):
        return self._callmethod('set_x', (value,))

# custom shared class manager
class MyBaseManager(BaseManager):
    pass

# client, connect to manager server and get data from shared class
print(f'client running on proc {os.getpid()}')

MyBaseManager.register("Foo", None, FooProxy)
MyBaseManager.register("get_foo")
m = MyBaseManager(address=(IP, PORT), authkey=KEY)
m.connect()

print(f'(proxy) {str(m)}')
print(f'(referant) {repr(m)}')

# get global instance and get value, should be 10
f = m.get_foo()
print(f'x={f.get_x()} => should be 10')

客户端输出:

client running on proc 7180
(proxy) <__main__.MyBaseManager object at 0x000001A3F0267610>
(referant) <__main__.MyBaseManager object at 0x000001A3F0267610>
x=10 => should be 10

推荐阅读