首页 > 解决方案 > 为什么进程终止时没有 process.join() 返回?

问题描述

我的代码如下:

from multiprocessing import Process, Queue
task_queue = Queue(TASK_QUEUE_SIZE)
result_queue = Queue()

def worker_thread(worker_id, task_queue, result_queue):
    # consume task_queue and put result into result_queue
    # if task is None, it prints closed message and returns 0
    ...

workers = [Process(target=worker_thread, args=(i, task_queue, result_queue)) 
           for i in range(NUM_OF_THREADS)]
for w in workers:
    w.start()

# put task into task_queue, read result from result_queue...
# all done. try to exit.

print('waiting 120 seconds for workers...')
time.sleep(120)  # wait long enough for workers return
for no, w in enumerate(workers):
    print(f'joining worker #{no}')
    w.join(1)
    print(f'worker #{no} is_alive: {w.is_alive()} exitcode: {w.exitcode}')

这个程序生成这样的日志:

pending jobs: 1
worker #0 is started.
waiting 120 seconds for workers...
worker #0 got task id: 123
worker #0 got result for task id: 123
worker #0 is closed.
joining worker #0
worker #0 isalive: False exitcode: 0

如果我使用 w.join(),它将永远等待。很奇怪。你能给我一些线索吗?先感谢您。

====

更新:我发现我的代码有时会永远等待。如果日志记录是我原始帖子的形式,它将安全退出。但是,如果它像下面的 [使用 join(1)],它将永远卡住。奇怪的是,worker #0 打印了关闭的消息但没有退出。

waiting 120 seconds for workers...
worker #0 got result id: 123
worker #0 is closed.
joining worker #0
worker #0 isalive: True exitcode: None


def worker_thread(worker_id: int, task_queue, result_queue):
    print(f'worker #{worker_id} is started.')

    while 1:
        try:
            t = task_queue.get()
            if t is None:
                break
            r = do_task(t)
            if r:
                result_queue.put((t, r))
        except Exception as e:
            print(f'[FATAL] worker #{worker_id} got unknown error on task {t}.\nReason: {type(e)} {e}')
            traceback.print_exc()
            continue

    print(f'worker #{worker_id} is closed.')
    return 0

标签: python-3.xpython-multiprocessing

解决方案


推荐阅读