python - 无法在 multiprocessing.Process 中访问 multiprocessing.queues.Queue 子类的属性
问题描述
我试图用来multiprocessing.Queue
保存的返回值multiprocessing.Process
:
queue = Queue()
for i, (name, func) in enumerate(funcs.items()):
p = Process(target=analyse, args=(i, name, func, grid, queue))
问题是它Queue.qsize
在 macOS 上不起作用,所以我在这个答案中使用了实现。
class Queue(multiprocessing.queues.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
self.size = SharedCounter(0)
def put(self, *args, **kwargs):
self.size.increment(1)
super().put(*args, **kwargs)
请注意,根据this answerctx=multiprocessing.get_context()
添加以修复缺失。ctx
有问题的代码:
def analyse(i, name, func, grid, queue):
...
queue.put((i, name, single, minimum, current, peak))
和 Python 的抱怨:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "...", line 26, in analyse
queue.put((i, name, single, minimum, current, peak))
File "...", line 48, in put
self.size.increment(1)
AttributeError: 'Queue' object has no attribute 'size'
任何想法错误在哪里?尝试在 PyCharm 中调试,当它传递给它时queue
仍然有,但它在被调用时不再存在。size
multiprocessing.Process
queue.put()
analyse()
编辑:随意回答这个问题。但是,我放弃了Queue
,而是使用了multiprocessing.Manager
,在那里牺牲了一些宝贵的毫秒。
解决方案
好的,这是完整的工作解决方案。显然队列需要它的状态集和恢复如here所述。
import multiprocessing
import multiprocessing.queues as mpq
class Queue(mpq.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, maxsize=-1, block=True, timeout=None):
self.block = block
self.timeout = timeout
super().__init__(maxsize, ctx=multiprocessing.get_context())
self.size = SharedCounter(0)
def __getstate__(self):
return super().__getstate__() + (self.size,)
def __setstate__(self, state):
super().__setstate__(state[:-1])
self.size = state[-1]
def put(self, *args, **kwargs):
super(Queue, self).put(*args, **kwargs)
self.size.increment(1)
def get(self, *args, **kwargs):
item = super(Queue, self).get(*args, **kwargs)
self.size.increment(-1)
return item
def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self.size.value
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()
def clear(self):
""" Remove all elements from the Queue. """
while not self.empty():
self.get()
class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n = 0):
self.count = multiprocessing.Value('i', n)
def increment(self, n = 1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" Return the value of the counter """
return self.count.value
推荐阅读
- html - CSS Hover only on text?
- jmeter - Jmeter 在使用逗号发送请求时生成 400 Bad Request 错误
- spring-batch - 带有文件参数的Spring批量写入异常
- pip - Pointing pip3 to python3.5 instead of python3.6
- sql-server - 当旧 Windows 帐户已损坏时,如何使用新用户帐户访问 SSRS 报告
- c++11 - reinterpret_cast 与托管类
- javascript - 动态组件列表上带有 ng-template 的 cdkDropList 不起作用
- mysql - SQL - SELECT WHERE 存储在 JSON varchar 中的日期大于或小于固定日期
- maven - Javafx 11 Spring Boot 和 Maven:在 META-INF/spring.factories 中找不到自动配置类
- git - 创建新分支后,我是否需要重新加载以崇高文本打开的文件?