首页 > 解决方案 > 如何在 Dask 中为每个分区返回一个 NumPy 数组?

问题描述

我需要计算许多 NumPy 数组(最多可以是 4 维),一个用于 Dask 数据帧的每个分区,然后将它们添加为数组。但是,我正在努力map_partitions为每个分区返回一个数组,而不是为所有分区返回一个数组。

import dask.dataframe as dd
import numpy as np, pandas as pd

df = pd.DataFrame(range(15), columns=['x'])
ddf = dd.from_pandas(df, npartitions=3)

def func(partition):
    # Here I also tried returning the array in a list and in a tuple
    return np.array([[1, 2], [3, 4]])

# Here I tried all the options available for 'meta'
results = ddf.map_partitions(func).compute()

然后results是:

array([[1, 2],
       [3, 4],
       [1, 2],
       [3, 4],
       [1, 2],
       [3, 4]])

如果相反,我会results.sum().compute()得到30.

我想得到的是:

[np.array([[1, 2],[3, 4]]), np.array([[1, 2],[3, 4]]), np.array([[1, 2],[3, 4]])]

因此,如果我计算总和,我会得到:

array([[ 3,  6],
       [ 9, 12]])

您如何使用 Dask 实现此结果?

标签: dataframenumpydask

解决方案


你是对的,一个 dask-array 通常被视为一个单独的逻辑数组,它恰好是由片段组成的。单身你没有使用逻辑层,你可以delayed独自完成你的工作。另一方面,您想要的最终结果似乎真的是所有数据的总和,所以也许更简单的是合适的reshapeand sum(axis=)?

ddf.map_partitions(func).compute_chunk_sizes().reshape(
    -1, 2, 2).sum(axis=0).compute()

compute_chunk_sizes是必需的,因为尽管您的原始 pandas 数据框的大小已知,但 Dask 尚未评估您的函数,还不知道它返回的大小)

但是,根据您的设置,以下内容将起作用并且与您最初的尝试更相似,请参阅.to_delayed()

list_of_delayed = ddf.map_partitions(func).to_delayed().tolist()
tuple_of_np_lists = dask.compute(*list_of_delayed)

tolist部队评估包含的延迟对象)


推荐阅读