首页 > 解决方案 > Dask 能否自动创建一棵树来并行计算并减少工作人员之间的副本?

问题描述

我将其分为两个部分,背景和问题。问题一直在底部。

背景:

假设我想(使用 Dask 分布式)做一个令人尴尬的并行计算,比如对 16 个巨大的数据帧求和。我知道使用 CUDA 会很快实现,但让我们继续使用 Dask 来举例说明。

完成此操作的基本方法(使用延迟)是:

from functools import reduce
import math
from dask import delayed, compute, visualize
import dask.distributed as dd
import numpy as np

@delayed
def gen_matrix():
    return np.random.rand(1000, 1000)

@delayed
def calc_sum(matrices):
    return reduce(lambda a, b: a + b, matrices)

if __name__ == '__main__':

    num_matrices = 16

    # Plop them into a big list
    matrices = [gen_matrix() for _ in range(num_matrices)]

    # Here's the Big Sum
    matrices = calc_sum(matrices)

    # Go!
    with dd.Client('localhost:8786') as client:
        f = client.submit(compute, matrices)
        result = client.gather(f)

这是 dask 图:

在此处输入图像描述

这肯定会起作用,但是随着矩阵的大小(参见上面的 gen_matrix)变得太大,Dask 分布式工作人员开始遇到三个问题:

  1. 他们超时将数据发送给执行总和的主要工作人员
  2. 主要工作人员收集所有矩阵的内存不足
  3. 总和没有并行运行(只有矩阵 ganeration 是)

请注意,这些问题都不是 Dask 的错,它按宣传的那样工作。我刚刚设置的计算很差。

一种解决方案是将其分解为树计算,如下所示,以及该图的 dask 可视化:

from functools import reduce
import math
from dask import delayed, compute, visualize
import dask.distributed as dd
import numpy as np

@delayed
def gen_matrix():
    return np.random.rand(1000, 1000)

@delayed
def calc_sum(a, b):
    return a + b

if __name__ == '__main__':

    num_matrices = 16

    # Plop them into a big list
    matrices = [gen_matrix() for _ in range(num_matrices)]

    # This tells us the depth of the calculation portion
    # of the tree we are constructing in the next step
    depth = int(math.log(num_matrices, 2))

    # This is the code I don't want to have to manually write
    for _ in range(depth):
        matrices = [
            calc_sum(matrices[i], matrices[i+1])
            for i in range(0, len(matrices), 2)
        ]

    # Go!
    with dd.Client('localhost:8786') as client:
        f = client.submit(compute, matrices)
        result = client.gather(f)

和图表:

可视化

问题:

我希望能够通过库或 Dask 本身来完成此树的生成。我怎样才能做到这一点?

对于那些想知道的人,为什么不直接使用上面的代码呢?因为有些极端情况我不想编写代码,也因为它只是要编写更多代码:)

我也看到了这个:Parallelize tree creation with dask

functools 或 itertools 中是否有知道如何做到这一点的东西(并且可以与 dask.delayed 一起使用)?

标签: pythonparallel-processingdaskdask-distributeddask-delayed

解决方案


Dask bag 有一种减少/聚合方法,可以生成树状 DAG:fold

工作流程是“打包”延迟的对象,然后折叠它们。


推荐阅读