首页 > 解决方案 > map_async 影响执行计划的工作人员内部的 Python 错误

问题描述

伙计们!我遇到了这个问题,我很好奇发生了什么。池分叉 3 个进程。我的进一步假设:每个进程都会从父任务队列中提取任务。正如我们所见,进程并没有死,但有些任务被跳过了。也许有人有想法?

from multiprocessing import Pool
import os


def wrk(a):
    if a % 3 == 0:
        print(a, os.getpid(), 'GONNA DIE')
        raise ValueError('ERROR')
    else:
        print(a, os.getpid())


if __name__ == '__main__':
    with Pool(processes=3) as pool:
        p = pool.map_async(wrk, (i for i in range(50)))
        p.wait()

结果

0 29836 GONNA DIE
5 29835
6 29835 GONNA DIE
10 29836
11 29836
12 29836 GONNA DIE
15 29836 GONNA DIE
20 29836
21 29836 GONNA DIE
25 29835
26 29835
27 29835 GONNA DIE
30 29836 GONNA DIE
35 29835
36 29835 GONNA DIE
40 29836
41 29836
42 29836 GONNA DIE
45 29835 GONNA DIE

标签: pythonerror-handlingmultiprocessing

解决方案


很有意思。当您没有为该方法指定chunksize参数时,将根据可迭代的大小和池大小map计算一个值。在你的情况下,我相信它会使用 5 的chunksize值。

这意味着池中的每个空闲进程将一次从输入队列中拉出最多 5 个任务,并将它们作为一个批处理来处理。似乎(这让我感到惊讶)如果其中一个任务引发异常,则该过程不会继续处理批处理中的任何其他剩余任务。如果我们强制chunksize为 1,那么我们得到:

from multiprocessing import Pool
import os


def wrk(a):
    if a % 3 == 0:
        print(a, os.getpid(), 'GONNA DIE')
        raise ValueError('ERROR')
    else:
        print(a, os.getpid())


if __name__ == '__main__':
    with Pool(processes=3) as pool:
        p = pool.map_async(wrk, (i for i in range(50)), chunksize=1)
        p.wait()

印刷:

0 186952 GONNA DIE
1 249800
2 249800
3 249800 GONNA DIE
4 186952
5 186952
6 244428 GONNA DIE
7 186952
8 186952
9 186952 GONNA DIE
10 249800
11 186952
12 249800 GONNA DIE
13 186952
14 186952
15 186952 GONNA DIE
16 249800
17 249800
18 249800 GONNA DIE
20 186952
19 244428
21 186952 GONNA DIE
22 244428
23 244428
24 249800 GONNA DIE
25 244428
26 186952
27 244428 GONNA DIE
28 186952
29 186952
30 249800 GONNA DIE
32 244428
31 186952
33 244428 GONNA DIE
34 186952
35 249800
36 186952 GONNA DIE
37 249800
38 244428
39 249800 GONNA DIE
40 244428
41 186952
42 244428 GONNA DIE
43 249800
44 186952
45 249800 GONNA DIE
46 186952
47 244428
48 249800 GONNA DIE
49 186952

同样,如果我们设置chunksize=50,那么第一个空闲进程会抓取所有提交的任务,唯一的输出是:

0 115408 GONNA DIE

同样的事情发生在concurrent.futures.ProcessPoolExecutor

from concurrent.futures import ProcessPoolExecutor
import os


def wrk(a):
    if a % 3 == 0:
        print(a, os.getpid(), 'GONNA DIE')
        raise ValueError('ERROR')
    else:
        print(a, os.getpid())


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        p = executor.map(wrk, (i for i in range(50)), chunksize=5)

印刷:

0 39280 GONNA DIE
5 60616
10 39280
6 60616 GONNA DIE
11 39280
12 39280 GONNA DIE
15 60616 GONNA DIE
20 39280
25 60616
21 39280 GONNA DIE
26 60616
30 39280 GONNA DIE
27 60616 GONNA DIE
35 104328
40 39280
45 60616 GONNA DIE
36 104328 GONNA DIE
41 39280
42 39280 GONNA DIE

但是如果你使用默认的chunksize=1值,你会看到所有 50 行的输出。所以这个“特性”并不是这个类所特有的Pool

再次,非常令人惊讶。我将看一下multiprocessing代码,并会尝试在此问题上与您联系。

更新

我查看了concurrent.futures.ProcessPoolExecutor.map代码,它比 的代码更容易理解multiprocessing.pool.Pool.map,果然一旦chunksize批处理中的一个任务出现异常,该批处理中的其余任务就会被放弃。

如果您使用multiprocessing.pool.Pool.mapchunksize值 > 1,那么如果在您的工作函数中引发异常,则不仅会放弃批处理的其余部分,还会放弃排队等待运行的任何批处理。函数本身会map抛出异常。要从成功完成的工作函数执行中获得任何可能的结果,请使用imap(或imap_unordered.

from multiprocessing import Pool

def worker(x):
    if x % 8 == 0:
        raise ValueError(str(x))
    return x

if __name__ == '__main__':
    pool = Pool(8)
    results = pool.imap(worker, range(1, 30), chunksize=4)
    it = results.__iter__()
    while True:
        try:
            x = it.__next__()
        except StopIteration:
            break
        except Exception as e:
            print('Exception:', e)
        else:
            print('x =', x)

印刷

x = 1
x = 2
x = 3
x = 4
Exception: 8

如果指定了chunksize=1参数,那么如果工作函数引发异常,排队等待运行或已运行但尚未完成的任务将不会被放弃:

from multiprocessing import Pool

def worker(x):
    if x % 8 == 0:
        raise ValueError(str(x))
    return x

if __name__ == '__main__':
    pool = Pool(8)
    results = pool.imap(worker, range(1, 30), chunksize=1)
    it = results.__iter__()
    while True:
        try:
            x = it.__next__()
        except StopIteration:
            break
        except Exception as e:
            print('Exception:', e)
        else:
            print('x =', x)

印刷:

x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
Exception: 8
x = 9
x = 10
x = 11
x = 12
x = 13
x = 14
x = 15
Exception: 16
x = 17
x = 18
x = 19
x = 20
x = 21
x = 22
x = 23
Exception: 24
x = 25
x = 26
x = 27
x = 28
x = 29

最接近的等效程序使用concurrent.futures将是:

from concurrent.futures import ProcessPoolExecutor

def worker(x):
    if x % 8 == 0:
        raise ValueError(str(x))
    return x

if __name__ == '__main__':
    executor = ProcessPoolExecutor()
    results = executor.map(worker, range(1, 30))
    it = results.__iter__()
    while True:
        try:
            x = it.__next__()
        except StopIteration:
            break
        except Exception as e:
            print('Exception:', e)
        else:
            print('x =', x)

(使用的默认chunksize值为 1)

印刷:

x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
Exception: 8

但是,正如您所看到的,一旦引发异常,map就返回值而言,所有处理都会停止。但是,您可以并且应该使用该submit方法来解决此问题:

from concurrent.futures import ProcessPoolExecutor

def worker(x):
    if x % 8 == 0:
        raise ValueError(str(x))
    return x

if __name__ == '__main__':
    executor = ProcessPoolExecutor()
    futures = [executor.submit(worker, idx) for idx in range(1, 30)]
    for future in futures:
        try:
            x = future.result()
        except Exception as e:
            print('Exception:', e)
        else:
            print('x =', x)

印刷:

x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
Exception: 8
x = 9
x = 10
x = 11
x = 12
x = 13
x = 14
x = 15
Exception: 16
x = 17
x = 18
x = 19
x = 20
x = 21
x = 22
x = 23
Exception: 24
x = 25
x = 26
x = 27
x = 28
x = 29

结论

如果您的工作函数不引发异常,则可以避免所有这些问题。如果它需要返回一个值并且发生异常情况,它甚至可以返回一个Exception主进程可以测试的实例:

from multiprocessing import Pool

def worker(x):
    if x % 8 == 0:
        return ValueError(str(x))
    return x

if __name__ == '__main__':
    pool = Pool(8)
    results = pool.map(worker, range(1, 30), chunksize=4)
    for result in results:
        if isinstance(result, Exception):
            print('Exception:', result)
        else:
            print('x =', result)

印刷:

x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
Exception: 8
x = 9
x = 10
x = 11
x = 12
x = 13
x = 14
x = 15
Exception: 16
x = 17
x = 18
x = 19
x = 20
x = 21
x = 22
x = 23
Exception: 24
x = 25
x = 26
x = 27
x = 28
x = 29

推荐阅读