首页 > 解决方案 > 从 multiprocessing.Pool.map 提前退出(在子进程中引发不起作​​用)

问题描述

正如Rugnar 的回答中所指出的,我的复制是错误的。我将代码大部分保持原样,因为我不确定这在澄清和更改含义之间的位置。

我有数千个作业需要运行,并且希望任何错误立即停止执行。我将任务包装在try/ except...raise中,以便我可以记录错误(没有所有多处理/线程噪音),然后重新引发。这不会杀死主进程。

发生了什么事,我怎样才能得到我正在寻找的提前退出? sys.exit(1)在子死锁中,将try/ except...raise函数包装在另一个函数中也不起作用。

$ python3 mp_reraise.py
(0,)
(1,)
(2,)
(3,)
(4,)
(5,)
(6,)
(7,)
(8,)
(9,)
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "mp_reraise.py", line 5, in f_reraise
    raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_reraise.py", line 14, in <module>
    test_reraise()
  File "mp_reraise.py", line 12, in test_reraise
    p.map(f_reraise, range(10))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: (0,)

mp_reraise.py

import multiprocessing

def f_reraise(*args):
    try:
        raise Exception(args)
    except Exception as e:
        print(e)
        raise

def test_reraise():
    with multiprocessing.Pool() as p:
        p.map(f_reraise, range(10))

test_reraise()

如果我不抓住并重新加注,执行会按预期提前停止: [这实际上并没有停止,根据 Rugnar 的回答]

$ python3 mp_raise.py 
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "mp_raise.py", line 4, in f_raise
    raise Exception(args)
Exception: (0,)
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mp_raise.py", line 10, in <module>
    test_raise()
  File "mp_raise.py", line 8, in test_raise
    p.map(f_raise, range(10))
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
Exception: (0,)  

mp_raise.py

import multiprocessing

def f_raise(*args):
    # missing print, which would demonstrate that
    # this actually does not stop early
    raise Exception(args)

def test_raise():
    with multiprocessing.Pool() as p:
        p.map(f_raise, range(10))

test_raise()

标签: pythonpython-3.xpython-multiprocessing

解决方案


在您的中mp_raise.py,您不打印任何内容,因此您看不到完成了多少工作。我添加了打印,发现只有在作业迭代器用尽时,池才会看到孩子的异常。所以它永远不会提前停止。

如果您需要在异常发生后尽早停止,请试试这个

import time
import multiprocessing as mp


def f_reraise(i):
    if abort.is_set():  # cancel job if abort happened
        return
    time.sleep(i / 1000)  # add sleep so jobs are not instant, like in real life
    if abort.is_set():  # probably we need stop job in the middle of execution if abort happened
        return
    print(i)
    try:
        raise Exception(i)
    except Exception as e:
        abort.set()
        print('error:', e)
        raise


def init(a):
    global abort
    abort = a


def test_reraise():
    _abort = mp.Event()

    # jobs should stop being fed to the pool when abort happened
    # so we wrap jobs iterator this way
    def pool_args():
        for i in range(100):
            if not _abort.is_set():
                yield i

    # initializer and init is a way to share event between processes
    # thanks to https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes
    with mp.Pool(8, initializer=init, initargs=(_abort,)) as p:
        p.map(f_reraise, pool_args())


if __name__ == '__main__':
    test_reraise()

推荐阅读