python - Dask 能否自动创建一棵树来并行计算并减少工作人员之间的副本?
问题描述
我将其分为两个部分,背景和问题。问题一直在底部。
背景:
假设我想(使用 Dask 分布式)做一个令人尴尬的并行计算,比如对 16 个巨大的数据帧求和。我知道使用 CUDA 会很快实现,但让我们继续使用 Dask 来举例说明。
完成此操作的基本方法(使用延迟)是:
from functools import reduce
import math
from dask import delayed, compute, visualize
import dask.distributed as dd
import numpy as np
@delayed
def gen_matrix():
return np.random.rand(1000, 1000)
@delayed
def calc_sum(matrices):
return reduce(lambda a, b: a + b, matrices)
if __name__ == '__main__':
num_matrices = 16
# Plop them into a big list
matrices = [gen_matrix() for _ in range(num_matrices)]
# Here's the Big Sum
matrices = calc_sum(matrices)
# Go!
with dd.Client('localhost:8786') as client:
f = client.submit(compute, matrices)
result = client.gather(f)
这是 dask 图:
这肯定会起作用,但是随着矩阵的大小(参见上面的 gen_matrix)变得太大,Dask 分布式工作人员开始遇到三个问题:
- 他们超时将数据发送给执行总和的主要工作人员
- 主要工作人员收集所有矩阵的内存不足
- 总和没有并行运行(只有矩阵 ganeration 是)
请注意,这些问题都不是 Dask 的错,它按宣传的那样工作。我刚刚设置的计算很差。
一种解决方案是将其分解为树计算,如下所示,以及该图的 dask 可视化:
from functools import reduce
import math
from dask import delayed, compute, visualize
import dask.distributed as dd
import numpy as np
@delayed
def gen_matrix():
return np.random.rand(1000, 1000)
@delayed
def calc_sum(a, b):
return a + b
if __name__ == '__main__':
num_matrices = 16
# Plop them into a big list
matrices = [gen_matrix() for _ in range(num_matrices)]
# This tells us the depth of the calculation portion
# of the tree we are constructing in the next step
depth = int(math.log(num_matrices, 2))
# This is the code I don't want to have to manually write
for _ in range(depth):
matrices = [
calc_sum(matrices[i], matrices[i+1])
for i in range(0, len(matrices), 2)
]
# Go!
with dd.Client('localhost:8786') as client:
f = client.submit(compute, matrices)
result = client.gather(f)
和图表:
问题:
我希望能够通过库或 Dask 本身来完成此树的生成。我怎样才能做到这一点?
对于那些想知道的人,为什么不直接使用上面的代码呢?因为有些极端情况我不想编写代码,也因为它只是要编写更多代码:)
我也看到了这个:Parallelize tree creation with dask
functools 或 itertools 中是否有知道如何做到这一点的东西(并且可以与 dask.delayed 一起使用)?
解决方案
Dask bag 有一种减少/聚合方法,可以生成树状 DAG:fold。
工作流程是“打包”延迟的对象,然后折叠它们。
推荐阅读
- java - 多线程银行账户java
- extjs - 运行时无法运行 Sencha 应用程序错误
- python - 将一列中的值替换为另一列中的值
- mysql - SQL GROUP_CONCAT 不适用于 2 个表连接
- java - 为什么 forEach, andThen 需要强制转换为 Consumer?
- java - 获取用户输入的 For 循环允许您为第一个问题输入两个值,但只计算一个值
- excel - 对象的 WorksheetFunction.Month 错误不支持属性或方法
- c# - 获取从现在到 2 天前的小时和分钟的随机日期
- ruby-on-rails - 用户中的名称错误#new
- android - 在 Unity 中为 Android 构建 Gradle 失败,说找不到 intellij-core.jar