python - Python多处理队列子类在进程中丢失属性
问题描述
我正在尝试从 python 中的多处理队列中实现一个子类。子类包含一个简单的布尔标志“就绪”。当我将队列发送到新进程时,就绪属性正在消失。下面的代码演示了这个问题:
import multiprocessing
import multiprocessing.queues
class ReadyQueue(multiprocessing.queues.Queue):
def __init__(self, ctx, *args, **kwargs):
super(ReadyQueue, self).__init__(ctx=ctx, *args, **kwargs)
self.ready = False
def ready_queue(*args, **kwargs):
return ReadyQueue(ctx=multiprocessing.get_context(), *args, **kwargs)
def foo(q):
print(q.ready)
if __name__ == "__main__":
my_queue = ready_queue()
print(my_queue.ready)
p = multiprocessing.Process(target=foo, args=(my_queue,))
p.start()
p.join()
输出:
False
Process Process-1:
Traceback (most recent call last):
File "C:\Users\acre018\Anaconda3\envs\EIT_Qt\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "C:\Users\acre018\Anaconda3\envs\EIT_Qt\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\acre018\github\EIT_Qt\Experiments\ready_queue_test.py", line 16, in foo
print(q.ready)
AttributeError: 'ReadyQueue' object has no attribute 'ready'
解决方案
我实现了这个解决方法:
import multiprocessing
from queue import Empty
import time
import ctypes
class ReadyQueue:
def __init__(self, *args, **kwargs):
self.queue = multiprocessing.Queue(*args, **kwargs)
self._ready = multiprocessing.Value(ctypes.c_bool, False)
def set_ready(self):
self._ready.value = True
def set_not_ready(self):
self._ready.value = False
self.clear()
def is_ready(self):
return self._ready.value
def clear(self):
try:
while True:
self.queue.get(block=False)
except Empty:
pass
def get(self, block=True, timeout=None):
return self.queue.get(block, timeout)
def put(self, obj, block=True, timeout=None):
return self.queue.put(obj, block, timeout)
def full(self):
return self.queue.full()
def empty(self):
return self.queue.empty()
def qsize(self):
return self.queue.qsize()
def foo(q):
while q.is_ready():
time.sleep(1)
q.put("hello from foo")
print("q no longer ready, foo loop finished")
if __name__ == "__main__":
my_queue = ReadyQueue()
my_queue.set_ready()
p = multiprocessing.Process(target=foo, args=(my_queue,))
p.start()
for i in range(2):
print(my_queue.get())
time.sleep(2)
print("my_queue._ready = %s, qsize: %d. Setting not ready.." % (str(my_queue.is_ready()), my_queue.qsize()))
my_queue.set_not_ready()
print("my_queue._ready = %s, qusize: %d" % (str(my_queue.is_ready()), my_queue.qsize()))
输出:
C:\Users\acre018\Anaconda3\envs\test_pyqt\python.exe C:/Users/acre018/github/EIT_Qt/Experiments/ready_queue_test2.py
hello from foo
hello from foo
my_queue._ready = True, qsize: 2. Setting not ready..
my_queue._ready = False, qusize: 0
q no longer ready, foo loop finished
Process finished with exit code 0
解决方法是让我的ReadyQueue
类不继承自multiprocessing.queues.Queue
队列,但将队列作为属性。为方便起见,我从队列中实现了我需要的方法,它们只是传递给队列属性。我还实现了一个clear
方法。
请注意,在我的第一个示例中,我忽略了制作self.ready
a multiprocessing.Value
,因此无法跨进程对其进行编辑,但我在修复该问题后进行了测试,这不是问题的根源。
推荐阅读
- jquery - 如何使用 jquery 仅从该 HTML 表的可见行中检索值?
- django - Django forms.validation 错误未显示
- mathjax - 下标中的花括号导致 mathjax 中的渲染问题
- python - 用 Python 计算 Keras 神经网络的准确度
- f# - 如何在 F# 中传递带有偏移量的数组?
- android - Android Studio:无效缓存/重启后未解决的对活动主的引用
- sparql - 如何删除重复的空白节点
- java - 具有多个操作的 Java 计算器出现字符串错误
- python - Tweepy 回复提及机器人
- sql - SQL 查询选择小于或等于现有列的 (max - min) 的行