首页 > 解决方案 > 计算具有共同依赖关系的两个值时,内存使用率很高

问题描述

我在单台机器上使用 Dask(LocalCluster有 4 个进程、16 个线程、68.56GB 内存),并且在尝试一次计算两个共享依赖项的结果时遇到了工作内存问题。

在下面显示的示例中,result仅使用一次计算的计算运行良好且快速,工作人员的综合内存使用量最大约为 1GB。但是,当results使用两次计算进行计算时,工作人员会快速使用所有内存并在总内存使用量约为 40GB 时开始写入磁盘。计算最终将完成,但是一旦开始写入磁盘,就会出现预期的大幅减速。

直观地说,如果读入一个块,然后立即计算它的两个和,则可以丢弃该块并且内存使用率保持较低。但是,Dask 似乎优先加载数据,而不是稍后清理内存的聚合计算。

任何帮助理解这里发生的事情将不胜感激。如何计算具有共同依赖关系的两个结果,而无需两次读取基础数据或将其完全读入内存?

import dask
import dask.dataframe as dd
import dask.array as da
from dask.distributed import Client

client = Client("localhost:8786")

array = da.random.normal(size=(int(1e9), 10), chunks=(int(1e6), 10))
df = dd.from_array(array, columns=[str(i) for i in range(10)])

# does not blow up worker memory, overall usage stays below 1GB total
result = dask.compute(df["0"].sum())

# does blow up worker memory
results = dask.compute([df["0"].sum(), df["1"].sum()])

标签: pythondaskdask-distributeddask-delayed

解决方案


数组的构造方式,每次创建块时都必须生成数组的每一列。因此,优化的一个机会(如果可能)是以允许按列处理的方式生成/加载数组。这将减少单个任务的内存负载。

优化的另一个地方是明确指定公共依赖项,例如dask.compute(df[['0', '1']].sum())将有效运行。

然而,更重要的一点是,默认情况下dask遵循一些关于如何确定工作优先级的经验法则,请参见此处。您有几个选项可以进行干预(不确定此列表是否详尽):自定义优先级、资源约束、修改计算图(以允许工作人员从中间任务中释放内存而无需等待最终任务完成)。

修改图的一种简单方法是通过手动计算中间和来分解最终和图与所有中间任务之间的依赖关系:

[results] = dask.compute([df["0"].map_partitions(sum), df["1"].map_partitions(sum)])

请注意,这results将是两个子列表的列表,但计算每个子列表的总和很简单(尝试在延迟的对象上运行会触发计算,因此在计算后sum运行效率更高)。sumresults


推荐阅读