首页 > 解决方案 > 将动态加载的函数提交给 ProcessPoolExecutor

问题描述

我想提交一个动态加载的函数到 concurrent.futures.ProcessPoolExecutor. 这是示例。有module.py其中包含的功能。

# Content of module.py

def func():
    return 1

然后,还有剩下的file.py

# Content of file.py

from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import importlib
from pathlib import Path
import inspect


def load_function_from_module(path):
    spec = importlib.util.spec_from_file_location(path.stem, str(path))
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)

    return mod


def func_top_level():
    return 2


if __name__ == '__main__':
    # Dynamically load function from other module.
    path = Path(__file__).parent / "module.py"
    func = dict(inspect.getmembers(load_function_from_module(path)))["func"]

    with ProcessPoolExecutor(2) as executor:
        future = executor.submit(func)
        future_ = executor.submit(func_top_level)

    # Here comes the exception.
    print(future.result())

追溯是

Traceback (most recent call last):
_pickle.PicklingError: Can't pickle <function func at 0x7f5a548eb050>: it's not the same object as module.func

func解决方案 1:用顶级函数包装

def myfunc(): return func()函数加载后放置并提交myfunc

这适用于这个例子,但是一旦你将整个if __name__ ...块移动到它自己的main()函数中,它myfunc()就会再次变成本地的并且黑客不起作用。由于问题发生在我的应用程序的深处,这是不可能的。

尝试2:替换picklecloudpickle

我个人最喜欢的解决方案是改变ProcessPoolExecutor序列化对象的方式。例如,cloudpickle可以序列化func.

虽然,这个答案表明可以注册自定义减速器,但以下 PR 和问题表明该功能不起作用或者我无法替换picklecloudpickle.

非常感谢您的帮助。

标签: pythonconcurrent.futures

解决方案


我找到了 cloudpickle 的解决方案。从这两个文件的内容开始。然后,将 bodyif __name__ ...移到一个新的 function下main()。这使问题变得更加困难,因为第一个解决方案不起作用。

由于 pickle 无法序列化func()from动态导入的函数module.py,我们在主进程中用 cloudpickle 将函数序列化为字节。然后,我们将提交一个接受字节并反序列化函数的函数。然后,可以执行该功能。更改或添加了这两个功能:

import cloudpickle


def main():
    # Dynamically load function from other module.
    path = Path(__file__).parent / "module.py"
    func = dict(inspect.getmembers(load_function_from_module(path)))["func"]

    with ProcessPoolExecutor(2) as executor:
        bytes_ = cloudpickle.dumps(func)
        future = executor.submit(deserialize_and_execute, bytes_)
        future_ = executor.submit(func_top_level)

    # Here comes the exception.
    print(future.result())


def deserialize_and_execute(bytes_):
    func = cloudpickle.loads(bytes_)
    func()

推荐阅读