python - 从 multiprocessing.Pool.map 提前退出(在子进程中引发不起作用)
问题描述
正如Rugnar 的回答中所指出的,我的复制是错误的。我将代码大部分保持原样,因为我不确定这在澄清和更改含义之间的位置。
我有数千个作业需要运行,并且希望任何错误立即停止执行。我将任务包装在try
/ except
...raise
中,以便我可以记录错误(没有所有多处理/线程噪音),然后重新引发。这不会杀死主进程。
发生了什么事,我怎样才能得到我正在寻找的提前退出?
sys.exit(1)
在子死锁中,将try
/ except
...raise
函数包装在另一个函数中也不起作用。
$ python3 mp_reraise.py
(0,)
(1,)
(2,)
(3,)
(4,)
(5,)
(6,)
(7,)
(8,)
(9,)
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "mp_reraise.py", line 5, in f_reraise
raise Exception(args)
Exception: (0,)
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "mp_reraise.py", line 14, in <module>
test_reraise()
File "mp_reraise.py", line 12, in test_reraise
p.map(f_reraise, range(10))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
Exception: (0,)
mp_reraise.py
import multiprocessing
def f_reraise(*args):
try:
raise Exception(args)
except Exception as e:
print(e)
raise
def test_reraise():
with multiprocessing.Pool() as p:
p.map(f_reraise, range(10))
test_reraise()
如果我不抓住并重新加注,执行会按预期提前停止: [这实际上并没有停止,根据 Rugnar 的回答]
$ python3 mp_raise.py
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "mp_raise.py", line 4, in f_raise
raise Exception(args)
Exception: (0,)
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "mp_raise.py", line 10, in <module>
test_raise()
File "mp_raise.py", line 8, in test_raise
p.map(f_raise, range(10))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
Exception: (0,)
mp_raise.py
import multiprocessing
def f_raise(*args):
# missing print, which would demonstrate that
# this actually does not stop early
raise Exception(args)
def test_raise():
with multiprocessing.Pool() as p:
p.map(f_raise, range(10))
test_raise()
解决方案
在您的中mp_raise.py
,您不打印任何内容,因此您看不到完成了多少工作。我添加了打印,发现只有在作业迭代器用尽时,池才会看到孩子的异常。所以它永远不会提前停止。
如果您需要在异常发生后尽早停止,请试试这个
import time
import multiprocessing as mp
def f_reraise(i):
if abort.is_set(): # cancel job if abort happened
return
time.sleep(i / 1000) # add sleep so jobs are not instant, like in real life
if abort.is_set(): # probably we need stop job in the middle of execution if abort happened
return
print(i)
try:
raise Exception(i)
except Exception as e:
abort.set()
print('error:', e)
raise
def init(a):
global abort
abort = a
def test_reraise():
_abort = mp.Event()
# jobs should stop being fed to the pool when abort happened
# so we wrap jobs iterator this way
def pool_args():
for i in range(100):
if not _abort.is_set():
yield i
# initializer and init is a way to share event between processes
# thanks to https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes
with mp.Pool(8, initializer=init, initargs=(_abort,)) as p:
p.map(f_reraise, pool_args())
if __name__ == '__main__':
test_reraise()
推荐阅读
- javascript - Indesign 标注尺寸
- applescript - 使用苹果脚本重命名目录子文件夹中具有相同扩展名的所有文件?
- javascript - 替代 Angular 模板中的函数
- sql - 从 3 个不同的表中查询
- powershell - PS 脚本在第一个 else 语句后停止
- reactjs - 在导致 404 的路由内部刷新
- css - 如何创建网格项目的 1 和一半宽度
- javascript - 如何使用 jQuery Validate 模式规则仅允许不在不区分大小写列表中的单词
- media-source - 媒体源扩展 appendBuffer 恢复
- kubernetes - kubernetes nginx 入口请求标头或 Cookie 太大