首页 > 解决方案 > 进程完成但无法加入?

问题描述

为了加速某项任务,我将子类化Process以创建一个工作程序来处理来自样本的数据。一些管理类将为其提供数据并读取输出(使用两个Queue实例)。对于异步操作,我使用put_nowaitand get_nowait。最后,我向我的进程发送了一个特殊的退出代码,它打破了它的内部循环。然而……它永远不会发生。这是一个最小的可重现示例:

import multiprocessing as mp

class Worker(mp.Process):
  def __init__(self, in_queue, out_queue):
    super(Worker, self).__init__()
    self.input_queue = in_queue
    self.output_queue = out_queue

  def run(self):
    while True:
      received = self.input_queue.get(block=True)
      if received is None:
        break
      self.output_queue.put_nowait(received)
    print("\tWORKER DEAD")


class Processor():
  def __init__(self):
    # prepare
    in_queue = mp.Queue()
    out_queue = mp.Queue()
    worker = Worker(in_queue, out_queue)
    # get to work
    worker.start()
    in_queue.put_nowait(list(range(10**5))) # XXX
    # clean up
    print("NOTIFYING")
    in_queue.put_nowait(None)
    #out_queue.get() # XXX
    print("JOINING")
    worker.join()

Processor()

这段代码永远不会完成,像这样永久挂起:

NOTIFYING
JOINING
    WORKER DEAD

为什么?

我用 标记了两行XXX。在第一个中,如果我发送较少的数据(例如10**4),一切都会正常完成(进程按预期加入)。同样在第二个中,如果我get()在通知工人完成后。我知道我遗漏了一些东西,但文档中的任何内容似乎都不相关。

标签: python-3.xqueuepython-multiprocessing

解决方案


文档提到

当一个对象被放入队列时,该对象被腌制并且后台线程稍后将腌制数据刷新到底层管道。这会产生一些后果 [...] 将对象放入空队列后,在队列的 empty() 方法返回 False 和 get_nowait() 可以在不引发 queue.Empty 的情况下返回之前可能会有一个无限小的延迟。

https://docs.python.org/3.7/library/multiprocessing.html#pipes-and-queues

此外,

每当您使用队列时,您需要确保所有已放入队列的项目最终将在进程加入之前被删除。否则,您无法确定将项目放入队列的进程将终止。

https://docs.python.org/3.7/library/multiprocessing.html#multiprocessing-programming

这意味着您描述的行为可能是由工作人员中的竞争条件引起的self.output_queue.put_nowait(received),并且与处理器中的工作人员一起worker.join()加入__init__。如果加入比将其送入队列更快,那么一切都很好。如果太慢,队列中有一个项目,并且工作人员不会加入。

out_queue.get()在主进程中取消注释将清空队列,从而允许加入。但是,如果队列已经为空,则队列返回很重要,因此使用超时可能是尝试等待竞速条件结束的一种选择,例如out_qeue.get(timeout=10)

可能重要的还有保护主例程,尤其是对于 Windows(Windows上的 python 多处理,如果 __name__ == "__main__"


推荐阅读