python - 在模块化 python 代码库中使用 Dask LocalCluster()
问题描述
我正在尝试使用 Dask Distributed 的LocalCluster使用单台机器的所有内核并行运行代码。
考虑一个示例 python 数据管道,其文件夹结构如下。
sample_dask_program
├── main.py
├── parallel_process_1.py
├── parallel_process_2.py
├── process_1.py
├── process_2.py
└── process_3.py
main.py是入口点,它在管道中顺序执行。
例如:
def run_pipeline():
stage_one_run_util()
stage_two_run_util()
...
stage_six_run_util()
if __name__ == '__main__':
...
run_pipeline()
parallel_process_1.py和parallel_process_2.py是创建 Client() 并使用期货来实现并行性的模块。
with Client() as client:
# list to store futures after they are submitted
futures = []
for item in items:
future = client.submit(
...
)
futures.append(future)
results = client.gather(futures)
process_1.py、process_2.py和process_3.py是执行简单计算的模块,不需要使用所有 CPU 内核并行运行。
追溯:
File "/sm/src/calculation/parallel.py", line 140, in convert_qty_to_float
results = client.gather(futures)
File "/home/iouser/.local/lib/python3.7/site-packages/distributed/client.py", line 1894, in gather
asynchronous=asynchronous,
File "/home/iouser/.local/lib/python3.7/site-packages/distributed/client.py", line 778, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/home/iouser/.local/lib/python3.7/site-packages/distributed/utils.py", line 348, in sync
raise exc.with_traceback(tb)
File "/home/iouser/.local/lib/python3.7/site-packages/distributed/utils.py", line 332, in f
result[0] = yield future
File "/home/iouser/.local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
concurrent.futures._base.CancelledError
这是工人抛出的错误:
distributed.worker - ERROR - failed during get data with tcp://127.0.0.1:33901 -> tcp://127.0.0.1:38821
Traceback (most recent call last):
File "/home/iouser/.local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 248, in write
future = stream.write(frame)
File "/home/iouser/.local/lib/python3.7/site-packages/tornado/iostream.py", line 546, in write
self._check_closed()
File "/home/iouser/.local/lib/python3.7/site-packages/tornado/iostream.py", line 1035, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/iouser/.local/lib/python3.7/site-packages/distributed/worker.py", line 1248, in get_data
compressed = await comm.write(msg, serializers=serializers)
File "/home/iouser/.local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 255, in write
convert_stream_closed_error(self, e)
File "/home/iouser/.local/lib/python3.7/site-packages/distributed/comm/tcp.py", line 121, in convert_stream_closed_error
raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe
我无法在本地重现此错误或找到最小可重现示例,因为此错误的发生是突然的。
这是在模块化 python 程序中使用 Dask LocalCluster 的正确方法吗?
编辑
我观察到,当使用相对较多的线程和进程创建 LocalCluster 时,会出现这些错误。我正在使用 NumPy 和 Pandas 进行计算,这不是这里描述的好习惯。
有时,当使用 4 个工作人员和 16 个进程创建 LocalCluster 时,不会引发任何错误。当使用 8 个工作人员和 40 个进程创建 LocalCluster 时,会抛出我上面描述的错误。
据我了解,dask 随机选择此组合(这是 dask 的问题吗?),因为我在同一个 AWS Batch 实例(具有 8 个内核(16 个 vCPU))上进行了测试。
当我强制创建仅使用线程的集群时,不会弹出该问题。
例如:
cluster = LocalCluster(processes=False)
with Client(cluster) as client:
client.submit(...)
...
但是,仅使用线程创建 LocalCluster 会使执行速度减慢 2-3 倍。
那么,问题的解决方案是找到适合程序的正确数量的进程/线程吗?
解决方案
更常见的是创建一次 Dask Client,然后在其上运行许多工作负载。
with Client() as client:
stage_one(client)
stage_two(client)
话虽这么说,你在做什么应该没问题。如果您能够通过最小的示例重现错误,那将很有用(但没有期望)。
推荐阅读
- javascript - 本地客户端 JavaScript 数字日期格式(在浏览器中)
- c++ - C ++:仅针对两种相同情况之一的迭代器错误
- c# - Ef 中的 if-else 语句以及 MVC
- spring-batch - Spring批处理的Step配置中调用存储过程
- php - curl 发布请求而不是获取请求
- redirect - react-admin 重定向按钮,带有来自表单的参数
- javascript - 我如何在 electronjs 中打印桌面
- r - 使用sweep()计算相关矩阵
- linux - CMake install 以major.minor.patch 的形式创建符号链接
- html - SVG ID形式href的包裹捆绑器切割