首页 > 解决方案 > Pool.apply_async():嵌套函数未执行

问题描述

我正在熟悉 Python 的multiprocessing模块。以下代码按预期工作:

#outputs 0 1 2 3
from multiprocessing import Pool
def run_one(x):
    print x
    return

pool = Pool(processes=12)
for i in range(4):
    pool.apply_async(run_one, (i,))
pool.close()
pool.join() 

但是,现在,如果我在上面的代码周围包装一个函数,print则不会执行语句(或者至少重定向输出):

#outputs nothing
def run():
    def run_one(x):
        print x
        return    

    pool = Pool(processes=12)
    for i in range(4):    
        pool.apply_async(run_one, (i,))
    pool.close()
    pool.join()

如果我将run_one定义移到 之外run,则输出再次是预期的,当我调用时run()

#outputs 0 1 2 3
def run_one(x):
    print x
    return

def run():    
    pool = Pool(processes=12)
    for i in range(4):       
        pool.apply_async(run_one, (i,))
    pool.close()
    pool.join() 

我在这里想念什么?为什么第二个片段不打印任何内容?如果我只是调用该run_one(i)函数而不是 using apply_async,则所有三个代码的输出都相同。

标签: pythonasynchronousmultiprocessingpython-multiprocessing

解决方案


池需要腌制(序列化)它发送到其工作进程的所有内容。Pickling 实际上只保存函数的名称,而 unpickling 需要按名称重新导入函数。为此,该函数需要在顶层定义,嵌套函数不能被子级导入,并且已经尝试腌制它们会引发异常:

from multiprocessing.connection import _ForkingPickler

def run():
    def foo(x):
        pass
    _ForkingPickler.dumps(foo)  # multiprocessing custom pickler;
                                # same effect with pickle.dumps(foo)

run()
# Out:
Traceback (most recent call last):
...
AttributeError: Can't pickle local object 'run.<locals>.foo'

您看不到异常的原因是,因为已经开始在父项中的酸洗任务期间捕获异常,并且仅在您调用调用时立即获得的对象Pool时才重新引发异常。.get()AsyncResultpool.apply_async()

这就是为什么(使用 Python 2)你最好总是这样使用它,即使你的目标函数没有返回任何东西(仍然返回隐式None):

    results = [pool.apply_async(foo, (i,)) for i in range(4)]
    # `pool.apply_async()` immediately returns AsyncResult (ApplyResult) object
    for res in results:
        res.get()

非异步池方法喜欢Pool.map()Pool.starmap()在底层使用相同的(异步)低级函数,就像它们的异步兄弟一样,但它们额外调用.get()你,所以你总是会看到这些方法的异常。

Python 3 有一个error_callback用于异步池方法的参数,您可以使用它来处理异常。


推荐阅读