首页 > 解决方案 > 跨进程传递队列中的对象引用

问题描述

我有几个multiprocessing.Processes 并希望他们使用(队列get())可调用的不可腌制对象并调用它们。这些是在 之前创建的fork(),因此它们不需要酸洗。

usingmultiprocessing.Queue不起作用,因为它试图腌制所有东西:

import multiprocessing as mp

# create non-global callable to make it unpicklable
def make_callable():
    def foo():
        print("running foo")
    return foo

def bar():
    print("running bar")

def runall(q):
    while True:
        c = q.get()
        if c is None:
            break
        c()

if __name__ == '__main__':
    q = mp.Queue()
    call = make_callable()
    p = mp.Process(target=runall, args=(q,))
    p.start()
    q.put(bar)
    q.put(call)
    q.put(None)
    p.join()
running bar
Traceback (most recent call last):
  File "/usr/lib64/python3.7/multiprocessing/queues.py", line 236, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/usr/lib64/python3.7/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'make_callable.<locals>.foo'

等效的实现是将所有对象放入全局(或传递的)列表并仅传递索引,这很有效:

import multiprocessing as mp

# create non-global callable to make it unpicklable
def make_callable():
    def foo():
        print("running foo")
    return foo

def bar():
    print("running bar")

def runall(q, everything):
    while True:
        c = q.get()
        if c is None:
            break
        everything[c]()

if __name__ == '__main__':
    q = mp.Queue()
    call = make_callable()
    everything = [bar, call]
    p = mp.Process(target=runall, args=(q,everything))
    p.start()
    q.put(0)
    q.put(1)
    q.put(None)
    p.join()
running bar
running foo

问题是,虽然我知道传递的所有可调用对象都不会被垃圾收集(因此它们的地址将保持有效),但我事先没有完整的列表。

我也知道我可能会使用multiprocessing.Manager它并Queue使用一个Proxy对象来实现它,但这似乎有很多开销,尤其是在实际实现中,我也会传递其他可挑选的数据。

有没有办法腌制并仅将地址引用传递给跨多个进程共享的对象?

谢谢!

标签: pythonpython-multiprocessing

解决方案


经过一番思考和搜索,我相信我已经找到了我想要的答案,主要来自:Get object by id()? .

我可以传递一个id()可调用对象,然后在生成的过程中将其翻译回:

import ctypes
a = "hello world"
print ctypes.cast(id(a), ctypes.py_object).value

或者使用gc模块,只要我保持对对象的引用,它也应该工作:

import gc

def objects_by_id(id_):
    for obj in gc.get_objects():
        if id(obj) == id_:
            return obj
    raise Exception("No found")

然而,这些都不是很干净,最后,可能值得限制首先拥有所有可调用对象并仅传递索引。


推荐阅读