首页 > 解决方案 > 如何停止 multiprocessing.Pool on Exception

问题描述

在下面的代码中,我在第一次调用时引发了异常,但似乎异常被吸收了,我仍然执行了所有其他进程,这是什么问题?我想要的是,每当发生第一个异常时,打印它,然后直接停止多处理池。

def func(i):

    if i==0:
        raise Exception()
    else:
        time.sleep(1)
        print(i)

num_workers = 4

pool = multiprocessing.Pool(num_workers)

try:
    for i in range(4):
        pool.apply_async(func,args=(i,))
except:
    print("err")

pool.close()
pool.join()

以下根据 HTF 编辑的代码

import multiprocessing
import time

if __name__ == '__main__':
    def func(i):

        if i == 0:
            raise Exception()
        else:
            time.sleep(1)
            print(i)


    num_workers = 4

    pool = multiprocessing.Pool(num_workers)

    results = [pool.apply_async(func, args=(i,)) for i in range(4)]

    try:
        for result in results:
            result.get()
    except:
        print("err")

    pool.close()
    pool.join()

给出输出

err
1
2
3

我只期望的地方err

标签: pythonpython-3.xmultiprocessingpython-multiprocessing

解决方案


您刚刚安排了任务,但您需要等待结果

results = [pool.apply_async(func,args=(i,)) for i in range(4)]

try:
    for result in results:
        result.get()
except:
    print("err")

2021 年 4 月 7 日星期三 20:42:59 UTC 更新:

你可以尝试这样的事情:

import time
  
from functools import partial
from multiprocessing import Pool


def func(i):
    if i == 0:
        raise Exception("something bad happened")
    else:
        time.sleep(1)
        print(i)


def quit(pool, err):
    print(f"ERROR: {err}")
    pool.terminate()


def main():
    pool = Pool()
    partial_quit = partial(quit, pool)

    for i in range(4):
        pool.apply_async(func, args=(i,), error_callback=partial_quit)

    pool.close()
    pool.join()


if __name__ == "__main__":
    main()

测试:

$ python test1.py
ERROR: something bad happened

如果您需要返回值,则使用裸进程和队列实际上可能更容易:

import time
  
from multiprocessing import Process, Queue

PROCS = 4


def worker(q, i):
    if i == 10:
        print("error")
        q.put_nowait("ERROR")
    else:
        time.sleep(1)
        print(i)
        q.put_nowait(i)


def main():
    q = Queue()
    procs = []

    for i in range(PROCS):
        p = Process(target=worker, args=(q, i))
        p.start()
        procs.append(p)

    count = len(procs)

    while count:
        result = q.get()

        if result == "ERROR":
            for p in procs:
                p.terminate()
            break

        print(f"Result for: {result}")
        count -= 1


if __name__ == "__main__":
    main()

测试:

$ python test2.py
0
2
1
3
Result for: 0
Result for: 2
Result for: 1
Result for: 3

推荐阅读