dask - Dask Bags 的运行速度比串行计算慢得多
问题描述
我有一个非常大的数组(这里有 200 万个单元格),并且想为数组中的每个单元格执行一个工作流。这是我的测试代码:
import numpy as np
import dask
from dask.distributed import Client, LocalCluster
import dask.bag as db
# invoke 8 workers
cluster = LocalCluster(n_workers=8)
client = Client(cluster)
# test workflow to be applied to each cell. The real case is much more complex than this.
def g(x):
np.sqrt(np.abs(x)) ** np.log(np.abs(x))
# test array with 2,000,000 cells. Values are normally distributed.
test_array = np.random.randn(2000000)
然后我使用串行和并行计算来执行这个工作流。
%%time
# serial computation
results_serial = np.zeros((2000000, 1))
for i in range(len(test_array)):
results_serial[i] = g(test_array[i])
这需要大约 11 秒才能在我的机器上运行。但是对于使用并行计算dask.bag
:
%%time
# parallel computation
b = db.from_sequence(test_array, npartitions=24)
b = b.map(g)
results_parallel = b.compute()
在我的机器上运行大约需要 90 秒,这比串行计算慢得多。我想知道为什么我们会看到这一点,以及使用dask.bag
Dask 或其他模块来加速并行案例的建议解决方案是什么?
这是此代码的笔记本版本的链接,其中包含更多注释: https ://github.com/whyjz/dask-playground/blob/main/dask-test.ipynb
解决方案
您可以使用Dask Array来并行化 NumPy 操作。在这种情况下,它比 Dask Bag 具有更好的性能。它有一个map_blocks
函数,可用于调用 Dask Array 的每个块上的任何函数:https ://docs.dask.org/en/latest/generated/dask.array.map_blocks.html#dask.array.map_blocks
对于您的测试代码:
import dask.array as da
test_dask_array = da.from_array(test_array, chunks='5MB')
results_parallel = test_dask_array.map_blocks(lambda x: np.sqrt(np.abs(x)) ** np.log(np.abs(x)))
results_parallel.compute()
推荐阅读
- javascript - Firefox javascript 控制台 - 是否可以从文件中执行 javascript?
- flutter - Refresh firebase token Flutter and RESTfull API
- php - 翻译和调整预计发货日期
- sql - 每天的运行总和
- php - 如何在 Shopware 6 中建立关联?
- reactjs - 我如何使用反应立即通过帧运动更新父高度?
- angular - Rxweb 验证不会根据需要自动设置字段?(反应式表单验证)
- java - tWriteJsonField 忽略空字段
- c# - 按嵌套集合中对象中的属性过滤集合
- python - 协助编写awk脚本版python代码生成计数矩阵