首页 > 解决方案 > 优化散点图

问题描述

我有一个大数据问题。具体问题不是很重要,但我已经用 dask 解决了。现在我有两个问题。

from dask import distributed
import numpy as np

local_cluster = distributed.LocalCluster(n_workers=20, processes=True, memory_limit=0)
dask_client = distributed.Client(local_cluster)

hat_matrix = np.random.rand(1000,25000)
weight_matrix = np.random.rand(1000)
Y = np.random.rand(1000, 25000)

[scatter_hat] = dask_client.scatter([hat_matrix], broadcast=True)
[scatter_weight] = dask_client.scatter([weight_matrix], broadcast=True)

futures = [dask_client.submit(apply_function, i, scatter_hat, Y[i, :], scatter_weight)
           for i in range(Y.shape[0])]

results = dask_client.gather(futures)

我可以拆分Y(这很好,因为我没有足够的内存来一次真正加载它),但我所有的工人都需要hat_matrix. 分散hat_matrix然后Y按行发送效果很好。除了hat_matrixY都是...大,这很好。我有足够的内存来处理它。但是我找不到任何方法来允许短暂的内存峰值(在反序列化期间发生),所以如果我设置内存限制,保姆会杀死我所有的工人。然后是我所有的新工人。等等等等。所以我有三个问题:

有没有办法设置一个内存限制,允许在序列化数据进入和解包时出现峰值?如果我有 64 GB 的内存来驱动 20 个进程,我想设置一个内存限制,比如每个进程 2.8GB。当我分散 2GB 的数据时,每个反序列化进程的峰值约为 4GB,并且保姆会杀死一切。

有没有办法错开散射以最小化瞬态内存峰值?

有没有一种方便的方法可以通过磁盘而不是通过 TCP 分散数据,还是我必须自定义写入?(作为推论:有没有一种方便的方法可以从我所有工作人员的序列化文件中加载内存映射的 dask 数组?)

标签: pythondask

解决方案


有没有办法设置一个内存限制,允许在序列化数据进入和解包时出现峰值?

一般来说,反序列化运行任意代码,所以 dask 无法真正控制发生的事情。在实践中,尽管与现代硬件上可用的典型内存相比,您的矩阵并没有那么大,但我很惊讶您遇到了问题。Dask 对 NumPy 数组非常小心。我不希望它使用比数组大小更多的内存。

有没有办法错开散射以最小化瞬态内存峰值?

散射当前通过广播树。您的客户发送给几个工人,然后他们发送给更多的工人,依此类推。默认情况下,这里的分支因子只有两个,所以我会惊讶地看到这里出现了巨大的爆炸。

有没有一种方便的方法可以通过磁盘而不是通过 TCP 分散数据,还是我必须自定义写入?(作为推论:有没有一种方便的方法可以从我所有工作人员的序列化文件中加载内存映射的 dask 数组?)

也许您可以使用某种内存映射的 NumPy 数组而不是内存中的 NumPy 数组?


推荐阅读