首页 > 解决方案 > Dask Bags 的运行速度比串行计算慢得多

问题描述

我有一个非常大的数组(这里有 200 万个单元格),并且想为数组中的每个单元格执行一个工作流。这是我的测试代码:

import numpy as np
import dask
from dask.distributed import Client, LocalCluster
import dask.bag as db

# invoke 8 workers
cluster = LocalCluster(n_workers=8)
client = Client(cluster)

# test workflow to be applied to each cell. The real case is much more complex than this.
def g(x):
    np.sqrt(np.abs(x)) ** np.log(np.abs(x))

# test array with 2,000,000 cells. Values are normally distributed.
test_array = np.random.randn(2000000)

然后我使用串行和并行计算来执行这个工作流。

%%time
# serial computation

results_serial = np.zeros((2000000, 1))
for i in range(len(test_array)):
    results_serial[i] = g(test_array[i])

这需要大约 11 秒才能在我的机器上运行。但是对于使用并行计算dask.bag

%%time
# parallel computation

b = db.from_sequence(test_array, npartitions=24)
b = b.map(g)
results_parallel = b.compute()

在我的机器上运行大约需要 90 秒,这比串行计算慢得多。我想知道为什么我们会看到这一点,以及使用dask.bagDask 或其他模块来加速并行案例的建议解决方案是什么?

这是此代码的笔记本版本的链接,其中包含更多注释: https ://github.com/whyjz/dask-playground/blob/main/dask-test.ipynb

标签: daskdask-distributed

解决方案


您可以使用Dask Array来并行化 NumPy 操作。在这种情况下,它比 Dask Bag 具有更好的性能。它有一个map_blocks函数,可用于调用 Dask Array 的每个块上的任何函数:https ://docs.dask.org/en/latest/generated/dask.array.map_blocks.html#dask.array.map_blocks

对于您的测试代码:

import dask.array as da

test_dask_array = da.from_array(test_array, chunks='5MB')
results_parallel = test_dask_array.map_blocks(lambda x: np.sqrt(np.abs(x)) ** np.log(np.abs(x)))
results_parallel.compute()

推荐阅读