首页 > 解决方案 > 无法在 multiprocessing.Process 中访问 multiprocessing.queues.Queue 子类的属性

问题描述

我试图用来multiprocessing.Queue保存的返回值multiprocessing.Process

queue = Queue()
for i, (name, func) in enumerate(funcs.items()):
    p = Process(target=analyse, args=(i, name, func, grid, queue))

问题是它Queue.qsize在 macOS 上不起作用,所以我在这个答案中使用了实现。

class Queue(multiprocessing.queues.Queue):
    """ A portable implementation of multiprocessing.Queue.
    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
        self.size = SharedCounter(0)

    def put(self, *args, **kwargs):
        self.size.increment(1)
        super().put(*args, **kwargs)

请注意,根据this answerctx=multiprocessing.get_context()添加以修复缺失。ctx

有问题的代码:

def analyse(i, name, func, grid, queue):
    ...
    queue.put((i, name, single, minimum, current, peak))

和 Python 的抱怨:

Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "...", line 26, in analyse
    queue.put((i, name, single, minimum, current, peak))
  File "...", line 48, in put
    self.size.increment(1)
AttributeError: 'Queue' object has no attribute 'size'

任何想法错误在哪里?尝试在 PyCharm 中调试,当它传递给它时queue仍然有,但它在被调用时不再存在。sizemultiprocessing.Processqueue.put()analyse()

编辑:随意回答这个问题。但是,我放弃了Queue,而是使用了multiprocessing.Manager,在那里牺牲了一些宝贵的毫秒。

标签: pythonpython-multiprocessing

解决方案


好的,这是完整的工作解决方案。显然队列需要它的状态集和恢复如here所述。

import multiprocessing
import multiprocessing.queues as mpq

class Queue(mpq.Queue):
    """ A portable implementation of multiprocessing.Queue.
    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().
    """
    def __init__(self, maxsize=-1, block=True, timeout=None):
        self.block = block
        self.timeout = timeout
        super().__init__(maxsize, ctx=multiprocessing.get_context())
        self.size = SharedCounter(0)

    def __getstate__(self):
        return super().__getstate__() + (self.size,)

    def __setstate__(self, state):
        super().__setstate__(state[:-1])
        self.size = state[-1]

    def put(self, *args, **kwargs):
        super(Queue, self).put(*args, **kwargs)
        self.size.increment(1)
    
    def get(self, *args, **kwargs):
        item = super(Queue, self).get(*args, **kwargs)
        self.size.increment(-1)
        return item

    def qsize(self):
        """ Reliable implementation of multiprocessing.Queue.qsize() """
        return self.size.value

    def empty(self):
        """ Reliable implementation of multiprocessing.Queue.empty() """
        return not self.qsize()

    def clear(self):
        """ Remove all elements from the Queue. """
        while not self.empty():
            self.get()

class SharedCounter(object):
    """ A synchronized shared counter.
    The locking done by multiprocessing.Value ensures that only a single
    process or thread may read or write the in-memory ctypes object. However,
    in order to do n += 1, Python performs a read followed by a write, so a
    second process may read the old value before the new one is written by the
    first process. The solution is to use a multiprocessing.Lock to guarantee
    the atomicity of the modifications to Value.
    This class comes almost entirely from Eli Bendersky's blog:
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
    """

    def __init__(self, n = 0):
        self.count = multiprocessing.Value('i', n)

    def increment(self, n = 1):
        """ Increment the counter by n (default = 1) """
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        """ Return the value of the counter """
        return self.count.value

推荐阅读