python-3.x - 进程完成但无法加入?
问题描述
为了加速某项任务,我将子类化Process
以创建一个工作程序来处理来自样本的数据。一些管理类将为其提供数据并读取输出(使用两个Queue
实例)。对于异步操作,我使用put_nowait
and get_nowait
。最后,我向我的进程发送了一个特殊的退出代码,它打破了它的内部循环。然而……它永远不会发生。这是一个最小的可重现示例:
import multiprocessing as mp
class Worker(mp.Process):
def __init__(self, in_queue, out_queue):
super(Worker, self).__init__()
self.input_queue = in_queue
self.output_queue = out_queue
def run(self):
while True:
received = self.input_queue.get(block=True)
if received is None:
break
self.output_queue.put_nowait(received)
print("\tWORKER DEAD")
class Processor():
def __init__(self):
# prepare
in_queue = mp.Queue()
out_queue = mp.Queue()
worker = Worker(in_queue, out_queue)
# get to work
worker.start()
in_queue.put_nowait(list(range(10**5))) # XXX
# clean up
print("NOTIFYING")
in_queue.put_nowait(None)
#out_queue.get() # XXX
print("JOINING")
worker.join()
Processor()
这段代码永远不会完成,像这样永久挂起:
NOTIFYING
JOINING
WORKER DEAD
为什么?
我用 标记了两行XXX
。在第一个中,如果我发送较少的数据(例如10**4
),一切都会正常完成(进程按预期加入)。同样在第二个中,如果我get()
在通知工人完成后。我知道我遗漏了一些东西,但文档中的任何内容似乎都不相关。
解决方案
文档提到
当一个对象被放入队列时,该对象被腌制并且后台线程稍后将腌制数据刷新到底层管道。这会产生一些后果 [...] 将对象放入空队列后,在队列的 empty() 方法返回 False 和 get_nowait() 可以在不引发 queue.Empty 的情况下返回之前可能会有一个无限小的延迟。
https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues
此外,
每当您使用队列时,您需要确保所有已放入队列的项目最终将在进程加入之前被删除。否则,您无法确定将项目放入队列的进程将终止。
https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming
这意味着您描述的行为可能是由工作人员中的竞争条件引起的self.output_queue.put_nowait(received)
,并且与处理器中的工作人员一起worker.join()
加入__init__
。如果加入比将其送入队列更快,那么一切都很好。如果太慢,队列中有一个项目,并且工作人员不会加入。
out_queue.get()
在主进程中取消注释将清空队列,从而允许加入。但是,如果队列已经为空,则队列返回很重要,因此使用超时可能是尝试等待竞速条件结束的一种选择,例如out_qeue.get(timeout=10)
。
可能重要的还有保护主例程,尤其是对于 Windows(Windows上的 python 多处理,如果 __name__ == "__main__")
推荐阅读
- javascript - 这是什么图案
- python - 在图形模式下评估张量
- r - 使用 Spark 将函数中的多个列名传递给 dplyr::distinct()
- python-3.x - 如何从 Minio 存储桶对象中获取 PyPDF2 元数据?
- css - 如何使图像将其高度调整为另一个网格项的高度?
- sql - 序列中的最大出现次数(高级间隙和孤岛问题)
- python - 不能把我的头绕在 Classes 和 Kivy 周围
- reactjs - 将本机 base64 反应为 pdf 展览打印
- aws-api-gateway - 有没有办法使用无服务器或其插件在 AWS 使用计划中配置方法限制
- sqlite - 全文搜索雪球算法如何解释未指定语言的单词