python - 将动态加载的函数提交给 ProcessPoolExecutor
问题描述
我想提交一个动态加载的函数到 concurrent.futures.ProcessPoolExecutor
. 这是示例。有module.py
其中包含的功能。
# Content of module.py
def func():
return 1
然后,还有剩下的file.py
# Content of file.py
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
import importlib
from pathlib import Path
import inspect
def load_function_from_module(path):
spec = importlib.util.spec_from_file_location(path.stem, str(path))
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def func_top_level():
return 2
if __name__ == '__main__':
# Dynamically load function from other module.
path = Path(__file__).parent / "module.py"
func = dict(inspect.getmembers(load_function_from_module(path)))["func"]
with ProcessPoolExecutor(2) as executor:
future = executor.submit(func)
future_ = executor.submit(func_top_level)
# Here comes the exception.
print(future.result())
追溯是
Traceback (most recent call last):
_pickle.PicklingError: Can't pickle <function func at 0x7f5a548eb050>: it's not the same object as module.func
func
解决方案 1:用顶级函数包装
def myfunc(): return func()
函数加载后放置并提交myfunc
。
这适用于这个例子,但是一旦你将整个if __name__ ...
块移动到它自己的main()
函数中,它myfunc()
就会再次变成本地的并且黑客不起作用。由于问题发生在我的应用程序的深处,这是不可能的。
尝试2:替换pickle
为cloudpickle
我个人最喜欢的解决方案是改变ProcessPoolExecutor
序列化对象的方式。例如,cloudpickle
可以序列化func
.
虽然,这个答案表明可以注册自定义减速器,但以下 PR 和问题表明该功能不起作用或者我无法替换pickle
为cloudpickle
.
- https://bugs.python.org/issue28053
- https://github.com/python/cpython/pull/9959
- https://github.com/python/cpython/pull/15058
非常感谢您的帮助。
解决方案
我找到了 cloudpickle 的解决方案。从这两个文件的内容开始。然后,将 bodyif __name__ ...
移到一个新的 function下main()
。这使问题变得更加困难,因为第一个解决方案不起作用。
由于 pickle 无法序列化func()
from动态导入的函数module.py
,我们在主进程中用 cloudpickle 将函数序列化为字节。然后,我们将提交一个接受字节并反序列化函数的函数。然后,可以执行该功能。更改或添加了这两个功能:
import cloudpickle
def main():
# Dynamically load function from other module.
path = Path(__file__).parent / "module.py"
func = dict(inspect.getmembers(load_function_from_module(path)))["func"]
with ProcessPoolExecutor(2) as executor:
bytes_ = cloudpickle.dumps(func)
future = executor.submit(deserialize_and_execute, bytes_)
future_ = executor.submit(func_top_level)
# Here comes the exception.
print(future.result())
def deserialize_and_execute(bytes_):
func = cloudpickle.loads(bytes_)
func()
推荐阅读
- xml - 在 Flutter 中显示来自 XML 的动态列表视图
- nlp - StanfordNLP、CoreNLP、spaCy - 不同的依赖图
- python - 为什么这段代码不会呈现我输入的问题和答案?
- android - 如何发布列表
在android上进行多部分改造? - laravel - SQLSTATE [23000]:完整性约束违规:1452 无法添加或更新子行 [0 下拉值]
- javascript - NodeJS如何循环查询
- python - 如何在逐行应用函数中正确过滤间歇性“NoneType”值?
- python - 自制python安装
- swift - iOS 实时检测来自相机的一些标记
- deep-learning - 为什么nan在前几次迭代中出现在损失层?