首页 > 解决方案 > 使用 python 多进程时在 (CLOSE, TERMINATE) 中断言 self._state

问题描述

我目前正在尝试使用 python 多处理。我使用的库是multiprocess(NOT multiprocessing)。

我有以下代码,它创建了许多计算作业,并通过映射操作运行它:

pool = multiprocess.Pool(4)
all_responses = pool.map_async(wrapper_singlerun, range(10000))
pool.join()
pool.close()

但是,每当我运行这段代码时,都会收到以下错误:

    pool.join()
  File "/Users/davidal/miniconda3/lib/python3.6/site-packages/multiprocess/pool.py", line 509, in join
    assert self._state in (CLOSE, TERMINATE)
AssertionError

您知道为什么会发生此错误吗?我以前用过pool.map_async,但认为我需要一个pool rendez-vous命令。否则,我的电脑创建了类似 forkbomb 的东西,它创建了太多线程(至少,我认为它是这样做的......)

任何想法表示赞赏!

标签: pythonmultiprocessingthreadpoolpython-multiprocessingmultiprocess

解决方案


问题是你join之前打电话close

multiprocess似乎缺少它的文档,但是,据我所知,它基本上是multiprocessingpre-monkeypatches dillfor中的 stdlib 的一个分支pickle,因此multiprocessing文档应该与此处相关。(另外,在评论中,你说你可以用 . 重现问题multiprocessing。)

所以,Pool.join说:

等待工作进程退出。必须调用close()terminate()使用之前join()

close方法是关闭队列的发送端以便无法添加新任务的方式。该join方法是您等待处理队列中所有内容的方式。在关闭之前等待队列耗尽是行不通的。

但是你打电话给closeafter join,而不是 before 。第一件事join你已经assert调用了closeor terminate,但你没有调用,因此断言失败。

因此,您可能只想切换这两个调用的顺序。

或者,也许您对什么join是 for 感到困惑,并认为您需要先调用它,然后才能使用all_responses.get()or .wait()。如果是这样——你不需要这样做;将get阻塞,直到结果可用,之后您不需要join. 这实际上更常见,尤其是map和朋友一起使用(尽管文档中的示例是通过 awith Pool(…) as pool:而不是手动调用池中的任何内容)。


推荐阅读