首页 > 解决方案 > multiprocessing.Pool 在关闭/加入后无限期挂起

问题描述

我有一个不确定的生产问题,其中 amultiprocessing.Pool卡住并且永远不会从join.

我已经设法将问题简化为这个小例子,并让它在某种程度上可靠地挂起。

工作示例:

#!/usr/bin/env python3
import os
import time
import multiprocessing.pool

def run_task(i):
    print(f'[{os.getpid()}] task({i}) complete')

if __name__ == '__main__':
    tasks = iter(range(10))
    processes = 4

    pool = multiprocessing.pool.Pool(processes=processes, maxtasksperchild=1)
    running = []
    while True:
        try:
            running = [ f for f in running if not f.ready() ]
            avail = processes - len(running)
            if avail:
                for _ in range(avail):
                    i = next(tasks)
                    print(f'[{os.getpid()}] add task({i})')
                    future = pool.apply_async(run_task, ( i, ))
                    running.append(future)
            else:
                time.sleep(0.1)
        except StopIteration:
            print(f'[{os.getpid()}] all tasks scheduled')
            break

    print(f'[{os.getpid()}] close and join pool')
    pool.close()
    pool.join()
    print(f'[{os.getpid()}] all done')

据推测,问题是时间问题之一,因为失败是不确定的。因此,我必须循环运行它才能让它挂起(尽管根据我的经验,它会挂在前几次迭代中的一个上)。

for i in {1..100}; do ./test.py; done   

挂起时的输出:

[15243] add task(0)
[15243] add task(1)
[15243] add task(2)
[15243] add task(3)
[15244] task(0) complete
[15245] task(1) complete
[15246] task(2) complete
[15247] task(3) complete
[15243] add task(4)
[15243] add task(5)
[15251] task(4) complete
[15243] add task(6)
[15243] add task(7)
[15252] task(5) complete
[15253] task(6) complete
[15243] add task(8)
[15243] add task(9)
[15243] all tasks scheduled
[15255] task(8) complete
[15256] task(9) complete
[15243] close and join pool     <-- hangs here indefinitely

主进程的gdb回溯:

#0  0x00007fb132b7c6c2 in __GI___waitpid (pid=22857, stat_loc=0x7fff8ef55d5c, options=0) at ../sysdeps/unix/sysv/linux/waitpid.c:30
#1  0x00000000005d10e5 in os_waitpid_impl (module=<optimised out>, options=0, pid=22857) at ../Modules/posixmodule.c:6941
#2  os_waitpid.lto_priv () at ../Modules/clinic/posixmodule.c.h:2995
#3  0x000000000050a84f in _PyCFunction_FastCallDict (kwargs=<optimised out>, nargs=<optimised out>, args=<optimised out>, func_obj=0x7fb132fea0d8) at ../Objects/methodobject.c:234
#4  _PyCFunction_FastCallKeywords (kwnames=<optimised out>, nargs=<optimised out>, stack=<optimised out>, func=<optimised out>) at ../Objects/methodobject.c:294
#5  call_function.lto_priv () at ../Python/ceval.c:4851

子进程的gdb回溯:

#0  0x00007fb1328896d6 in futex_abstimed_wait_cancelable (private=0, abstime=0x0, expected=0, futex_word=0x1c68e40) at ../sysdeps/unix/sysv/linux/futex-internal.h:205
#1  do_futex_wait (sem=sem@entry=0x1c68e40, abstime=0x0) at sem_waitcommon.c:111
#2  0x00007fb1328897c8 in __new_sem_wait_slow (sem=0x1c68e40, abstime=0x0) at sem_waitcommon.c:181
#3  0x00000000005ab535 in PyThread_acquire_lock_timed (intr_flag=<optimised out>, microseconds=<optimised out>, lock=<optimised out>) at ../Python/thread_pthread.h:386
#4  PyThread_acquire_lock () at ../Python/thread_pthread.h:595
#5  0x0000000000446bf1 in _enter_buffered_busy (self=self@entry=0x7fb13307aa98) at ../Modules/_io/bufferedio.c:292
#6  0x00000000004ce743 in buffered_flush.lto_priv () at ../Python/thread_pthread.h:416

实施说明:

仅在工作人员可用时安排任务:

每个单独的任务的优先级可以在等待执行时改变,所以我不能一开始就将所有任务排入队列。

因此running列表并检查AsyncResult.ready以确定我是否可以执行另一个任务

maxtasksperchild=1:

任务泄漏内存,所以为了回收每个任务运行后丢失的内存,我作弊并使用maxtasksperchild=1


观察:

睡觉 vs 忙着等待:

有趣的是,如果我将其更改time.sleep(0.1)为忙等待,挂起就会消失。

wait = time.time() + 0.1
while time.time() < wait:
    pass

是否有可能在父母睡眠期间错过了来自子进程的信号?

maxtasksperchild=1:

如果我重用原来的子进程,挂起就会消失。


因此,在每个任务完成后进程被销毁的事实与父进程睡眠之间似乎存在某种相互作用。

作为生产中的快速修复,我已经将睡眠改为忙碌的等待,但这感觉就像一个丑陋的黑客,我想了解究竟是什么导致了挂起。

标签: pythonpython-multiprocessing

解决方案


我认为问题是例外,从技术上讲,它不应该存在,并且可能已经在更高版本的 python 中得到修复。

[15243] add task(4)
[15243] add task(5)
[15251] task(4) complete
[15243] add task(6)
[15243] add task(7)
[15252] task(5) complete
[15253] task(6) complete
[15243] add task(8)
[15243] add task(9)
[15243] all tasks scheduled <-- Exception Called but [15254] or task(7) is not completed
[15255] task(8) complete
[15256] task(9) complete
[15243] close and join pool     <-- hangs here indefinitely

在异常调用的那个点发生了一些事情,这可能会导致 task(7) 进入一个奇怪的状态,apply_async 允许回调,这意味着 3.6 可能会以不稳定的方式创建线程。

阻塞等待意味着您的主服务器不会休眠,并且可能会更快地处理此问题。检查是否增加等待时间或使用 apply() 会有所不同。

我不确定为什么重用“修复”问题,但可能只是访问时间更快更容易处理。


推荐阅读