multiprocessing - 为什么在 Python3.8+ `multiprocessing` 中使用“fork”有效但使用“spawn”失败?
问题描述
我在 macOS 上工作,最近被 Python 3.8 多处理中的“fork”到“spawn”更改所困扰(请参阅doc)。下面显示了一个简化的工作示例,其中使用“fork”成功但使用“spawn”失败。代码的目的是创建一个支持size()
在 macOS 下调用的自定义队列对象,从而从Queue
对象继承并获取多处理的上下文。
import multiprocessing
from multiprocessing import Process
from multiprocessing.queues import Queue
from time import sleep
class Q(Queue):
def __init__(self):
super().__init__(ctx=multiprocessing.get_context())
self.size = 1
def call(self):
return print(self.size)
def foo(q):
q.call()
if __name__ == '__main__':
multiprocessing.set_start_method('spawn') # this would fail
# multiprocessing.set_start_method('fork') # this would succeed
q = Q()
p = Process(target=foo, args=(q,))
p.start()
p.join(timeout=1)
使用“spawn”时的错误消息输出如下所示。
Process Process-1:
Traceback (most recent call last):
File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/fanchen/Private/python_work/sandbox.py", line 23, in foo
q.call()
File "/Users/fanchen/Private/python_work/sandbox.py", line 19, in call
return print(self.size)
AttributeError: 'Q' object has no attribute 'size'
似乎子进程认为self.size
代码执行没有必要,所以没有复制。我的问题是为什么会发生这种情况?
在 macOS Catalina 10.15.6、Python 3.8.5 下测试的代码片段
解决方案
问题是生成的进程没有共享资源,因此要为每个进程正确重新创建队列实例,您需要添加序列化和反序列化方法。这是一个工作代码:
# Portable queue
# The idea of Victor Terron used in Lemon project (https://github.com/vterron/lemon/blob/master/util/queue.py).
# Pickling/unpickling methods are added to share Queue instance between processes correctly.
import multiprocessing
import multiprocessing.queues
class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n = 0):
self.count = multiprocessing.Value('i', n)
def __getstate__(self):
return (self.count,)
def __setstate__(self, state):
(self.count,) = state
def increment(self, n = 1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" Return the value of the counter """
return self.count.value
class Queue(multiprocessing.queues.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
self._counter = SharedCounter(0)
def __getstate__(self):
return super().__getstate__() + (self._counter,)
def __setstate__(self, state):
super().__setstate__(state[:-1])
self._counter = state[-1]
def put(self, *args, **kwargs):
super().put(*args, **kwargs)
self._counter.increment(1)
def get(self, *args, **kwargs):
item = super().get(*args, **kwargs)
self._counter.increment(-1)
return item
def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self._counter.value
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()
推荐阅读
- php - 字体文件在控制台上显示为未找到,并且在页面加载时未立即呈现
- python - 实时物体检测python
- c++ - 将两个字符串合并为布尔表达式
- sql - Ms Access 数据库的 SQL 查询。从相似 id 组查询访问数据库
- javascript - 如何通过滚动来缩放元素?
- extendscript - After Effects 脚本 - 来自源素材的 inPoint
- flutter - StorageReference 不是一种类型
- python - 当我不希望它时,Panda 会尝试将我的字符串转换为浮点数
- swift - 结构的异步闭包递归
- matlab - 立体校准和 3D 重建问题