python - 在 joblib `Parallel` 上下文中腌制 `matlab` 对象时出错
问题描述
我正在从 Python 上下文中并行运行一些 Matlab 代码(我知道,但这就是正在发生的事情),并且我遇到了一个涉及matlab.double
. 相同的代码在 a 中运行良好multiprocessing.Pool
,所以我无法找出问题所在。这是一个最小的重现测试用例。
import matlab
from multiprocessing import Pool
from joblib import Parallel, delayed
# A global object that I would like to be available in the parallel subroutine
x = matlab.double([[0.0]])
def f(i):
print(i, x)
with Pool(4) as p:
p.map(f, range(10))
# This prints 1, [[0.0]]\n2, [[0.0]]\n... as expected
for _ in Parallel(4, backend='multiprocessing')(delayed(f)(i) for i in range(10)):
pass
# This also prints 1, [[0.0]]\n2, [[0.0]]\n... as expected
# Now run with default `backend='loky'`
for _ in Parallel(4)(delayed(f)(i) for i in range(10)):
pass
# ^ this crashes.
因此,唯一有问题的是使用'loky'
后端的问题。完整的追溯是:
exception calling callback for <Future at 0x7f63b5a57358 state=finished raised BrokenProcessPool>
joblib.externals.loky.process_executor._RemoteTraceback:
'''
Traceback (most recent call last):
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 391, in _process_worker
call_item = call_queue.get(block=True, timeout=timeout)
File "~/miniconda3/envs/myenv/lib/python3.6/multiprocessing/queues.py", line 113, in get
return _ForkingPickler.loads(res)
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/mlarray.py", line 31, in <module>
from _internal.mlarray_sequence import _MLArrayMetaClass
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_sequence.py", line 3, in <module>
from _internal.mlarray_utils import _get_strides, _get_size, \
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_utils.py", line 4, in <module>
import matlab
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/__init__.py", line 24, in <module>
from mlarray import double, single, uint8, int8, uint16, \
ImportError: cannot import name 'double'
'''
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/_base.py", line 625, in _invoke_callbacks
callback(self)
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__
self.parallel.dispatch_next()
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next
if not self.dispatch_one_batch(self._original_iterator):
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
self._dispatch(tasks)
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
job = self._backend.apply_async(batch, callback=cb)
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 510, in apply_async
future = self._workers.submit(SafeFunction(func))
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/reusable_executor.py", line 151, in submit
fn, *args, **kwargs)
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 1022, in submit
raise self._flags.broken
joblib.externals.loky.process_executor.BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.
joblib.externals.loky.process_executor._RemoteTraceback:
'''
Traceback (most recent call last):
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 391, in _process_worker
call_item = call_queue.get(block=True, timeout=timeout)
File "~/miniconda3/envs/myenv/lib/python3.6/multiprocessing/queues.py", line 113, in get
return _ForkingPickler.loads(res)
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/mlarray.py", line 31, in <module>
from _internal.mlarray_sequence import _MLArrayMetaClass
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_sequence.py", line 3, in <module>
from _internal.mlarray_utils import _get_strides, _get_size, \
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/_internal/mlarray_utils.py", line 4, in <module>
import matlab
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/matlab/__init__.py", line 24, in <module>
from mlarray import double, single, uint8, int8, uint16, \
ImportError: cannot import name 'double'
'''
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "test.py", line 20, in <module>
for _ in Parallel(4)(delayed(f)(i) for i in range(10)):
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 934, in __call__
self.retrieve()
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 833, in retrieve
self._output.extend(job.get(timeout=self.timeout))
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 521, in wrap_future_result
return future.result(timeout=timeout)
File "~/miniconda3/envs/myenv/lib/python3.6/concurrent/futures/_base.py", line 432, in result
return self.__get_result()
File "~/miniconda3/envs/myenv/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
raise self._exception
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/_base.py", line 625, in _invoke_callbacks
callback(self)
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 309, in __call__
self.parallel.dispatch_next()
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 731, in dispatch_next
if not self.dispatch_one_batch(self._original_iterator):
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 759, in dispatch_one_batch
self._dispatch(tasks)
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/parallel.py", line 716, in _dispatch
job = self._backend.apply_async(batch, callback=cb)
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/_parallel_backends.py", line 510, in apply_async
future = self._workers.submit(SafeFunction(func))
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/reusable_executor.py", line 151, in submit
fn, *args, **kwargs)
File "~/miniconda3/envs/myenv/lib/python3.6/site-packages/joblib/externals/loky/process_executor.py", line 1022, in submit
raise self._flags.broken
joblib.externals.loky.process_executor.BrokenProcessPool: A task has failed to un-serialize. Please ensure that the arguments of the function are all picklable.
查看回溯,似乎根本原因是matlab
在子进程中导入包的问题。
可能值得注意的是,如果我已经定义x = np.array([[0.0]])
(在导入之后numpy as np
),这一切都运行得很好。当然,主进程对任何matlab
导入都没有问题,所以我不确定子进程为什么会这样。
我不确定这个错误是否与matlab
包特别有关,或者是否与全局变量和cloudpickle
或有关loky
。在我的应用程序中,坚持使用 会有所帮助loky
,因此我将不胜感激!
我还应该注意,我正在使用 Python 的官方 Matlab 引擎:https ://www.mathworks.com/help/matlab/matlab-engine-for-python.html 。我想这可能会让其他人很难尝试测试用例,所以我希望我可以用 以外的类型重现这个错误matlab.double
,但我还没有找到另一个。
挖掘更多,我注意到导入matlab
包的过程比我预期的更循环,我推测这可能是问题的一部分?问题是,当import matlab
由loky
's运行时_ForkingPickler
,首先导入了一些文件matlab/mlarray.py
,该文件导入了一些其他文件,其中一个包含import matlab
,这导致matlab/__init__.py
运行,内部具有from mlarray import double, single, uint8, ...
导致崩溃的行。
这种循环可能是问题吗?如果是这样,为什么我可以在主进程中导入这个模块而不是在loky
后端?
解决方案
该错误是由于子进程中全局对象的加载顺序不正确引起的。在全局变量加载时还没有导入的traceback_ForkingPickler.loads(res) -> ... -> import matlab -> from mlarray import ...
中可以清楚的看到
。matlab
x
cloudpickle
joblib
withloky
似乎将模块视为普通的全局对象并将它们动态发送到子进程。joblib 不记录定义这些对象/模块的顺序。因此,它们在子进程中以随机顺序加载(初始化)。
一个简单的解决方法是手动腌制 matlab 对象并在函数中导入 matlab 后加载它。
import matlab
import pickle
px = pickle.dumps(matlab.double([[0.0]]))
def f(i):
import matlab
x=pickle.loads(px)
print(i, x)
当然,您也可以使用joblib.dumps并loads
序列化对象。
使用初始化器
感谢@Aaron 的建议,您还可以在加载之前使用initializer
( for loky ) 导入 Matlab x
。
目前没有简单的 API 可以指定initializer
。所以我写了一个简单的函数:
def with_initializer(self, f_init):
# Overwrite initializer hook in the Loky ProcessPoolExecutor
# https://github.com/tomMoral/loky/blob/f4739e123acb711781e46581d5ed31ed8201c7a9/loky/process_executor.py#L850
hasattr(self._backend, '_workers') or self.__enter__()
origin_init = self._backend._workers._initializer
def new_init():
origin_init()
f_init()
self._backend._workers._initializer = new_init if callable(origin_init) else f_init
return self
它有点 hacky,但适用于当前版本的 joblib 和 loky。然后你可以像这样使用它:
import matlab
from joblib import Parallel, delayed
x = matlab.double([[0.0]])
def f(i):
print(i, x)
def _init_matlab():
import matlab
with Parallel(4) as p:
for _ in with_initializer(p, _init_matlab)(delayed(f)(i) for i in range(10)):
pass
我希望joblib的开发者将来会initializer
在构造函数中添加参数Parallel
。
推荐阅读
- javascript - 单击时如何更改引导轮播上的图像?
- python - Python `isinstance` 函数签名的隐藏意义
- react-native - 如何避免在滚动反应原生时按下按钮
- javascript - 如何从输入中为数组赋值?
- php - 如何在php中将64位无符号整数转换为十六进制值?
- python-3.x - 如何通过 python 脚本在 Mininet 主机上运行 XMLRPC 服务器和 XMLRPC 客户端?
- react-native - 反应原生 | i18n 快速刷新
- getstream-io - 如何创建一个链接,将用户引导至特定的 Stream Chat 频道?
- javascript - 在for循环中匹配条件后函数执行没有停止
- multithreading - ModbusTcpClient 连接在 ROS 应用程序中意外关闭