首页 > 解决方案 > Python多处理队列子类在进程中丢失属性

问题描述

我正在尝试从 python 中的多处理队列中实现一个子类。子类包含一个简单的布尔标志“就绪”。当我将队列发送到新进程时,就绪属性正在消失。下面的代码演示了这个问题:

import multiprocessing
import multiprocessing.queues


class ReadyQueue(multiprocessing.queues.Queue):
    def __init__(self, ctx, *args, **kwargs):
        super(ReadyQueue, self).__init__(ctx=ctx, *args, **kwargs)
        self.ready = False


def ready_queue(*args, **kwargs):
    return ReadyQueue(ctx=multiprocessing.get_context(), *args, **kwargs)


def foo(q):
    print(q.ready)


if __name__ == "__main__":
    my_queue = ready_queue()
    print(my_queue.ready)
    p = multiprocessing.Process(target=foo, args=(my_queue,))
    p.start()
    p.join()

输出:

False
Process Process-1:
Traceback (most recent call last):
  File "C:\Users\acre018\Anaconda3\envs\EIT_Qt\lib\multiprocessing\process.py", line 315, in _bootstrap
    self.run()
  File "C:\Users\acre018\Anaconda3\envs\EIT_Qt\lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\acre018\github\EIT_Qt\Experiments\ready_queue_test.py", line 16, in foo
    print(q.ready)
AttributeError: 'ReadyQueue' object has no attribute 'ready'

标签: pythoninheritancepython-multiprocessing

解决方案


我实现了这个解决方法:

import multiprocessing
from queue import Empty
import time
import ctypes


class ReadyQueue:
    def __init__(self, *args, **kwargs):
        self.queue = multiprocessing.Queue(*args, **kwargs)
        self._ready = multiprocessing.Value(ctypes.c_bool, False)

    def set_ready(self):
        self._ready.value = True

    def set_not_ready(self):
        self._ready.value = False
        self.clear()

    def is_ready(self):
        return self._ready.value

    def clear(self):
        try:
            while True:
                self.queue.get(block=False)
        except Empty:
            pass

    def get(self, block=True, timeout=None):
        return self.queue.get(block, timeout)

    def put(self, obj, block=True, timeout=None):
        return self.queue.put(obj, block, timeout)

    def full(self):
        return self.queue.full()

    def empty(self):
        return self.queue.empty()

    def qsize(self):
        return self.queue.qsize()


def foo(q):
    while q.is_ready():
        time.sleep(1)
        q.put("hello from foo")
    print("q no longer ready, foo loop finished")


if __name__ == "__main__":
    my_queue = ReadyQueue()
    my_queue.set_ready()
    p = multiprocessing.Process(target=foo, args=(my_queue,))
    p.start()

    for i in range(2):
        print(my_queue.get())
        time.sleep(2)

    print("my_queue._ready = %s, qsize: %d. Setting not ready.." % (str(my_queue.is_ready()), my_queue.qsize()))
    my_queue.set_not_ready()
    print("my_queue._ready = %s, qusize: %d" % (str(my_queue.is_ready()), my_queue.qsize()))

输出:

C:\Users\acre018\Anaconda3\envs\test_pyqt\python.exe C:/Users/acre018/github/EIT_Qt/Experiments/ready_queue_test2.py
hello from foo
hello from foo
my_queue._ready = True, qsize: 2. Setting not ready..
my_queue._ready = False, qusize: 0
q no longer ready, foo loop finished

Process finished with exit code 0

解决方法是让我的ReadyQueue类不继承自multiprocessing.queues.Queue队列,但将队列作为属性。为方便起见,我从队列中实现了我需要的方法,它们只是传递给队列属性。我还实现了一个clear方法。

请注意,在我的第一个示例中,我忽略了制作self.readya multiprocessing.Value,因此无法跨进程对其进行编辑,但我在修复该问题后进行了测试,这不是问题的根源。


推荐阅读