首页 > 解决方案 > 使用从 parquet 文件创建的 dask 数据帧时使用过多的内存

问题描述

我有 800K 行 x 8.7K 列的镶木地板文件。我将它加载到一个 dask 数据框中:

import dask.dataframe as dd
dask_train_df = dd.read_parquet('train.parquet')
dask_train_df.info()

这产生:

<class 'dask.dataframe.core.DataFrame'>
Columns: 8712 entries, 0 to 8711
dtypes: int8(8712)

当我尝试执行dask_train_df.head()or之类的简单操作dask_train_df.loc[2:4].compute()时,即使使用 17 GB 以上的 RAM,也会出现内存错误。

但是,如果我这样做:

import pandas as pd
train = pd.read_parquet('../input/train.parquet')
train.info()

产量:

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 800000 entries, 0 to 799999
Columns: 8712 entries, 0 to 8711
dtypes: int8(8712)
memory usage: 6.5 GB

而且我可以毫无问题地运行train.head()train.loc[2:4] 因为一切都已经在内存中了。

1)所以我的问题是,为什么这些简单的操作会使用 Dask Dataframe 破坏内存使用,但是当我使用 Pandas Dataframe 将所有内容加载到内存时却可以正常工作?

我注意到了,并且我在“将 Parquet 数据目录读入 Dask.dataframe,每个分区一个文件”npartitions=1的文档中看到了这一点。read_parquet就我而言,听起来我失去了拥有多个分区的所有并行化能力,但是 Dask Dataframe 内存使用量不应该受到单个 Pandas Dataframe 的内存量的限制吗?

2)另外,一个附带问题:如果我想通过在 Dask Dataframe 中分区来并行化这个单个 parquet 文件,我该怎么做?dd.read_parquet我在签名中没有看到 blocksize 参数。我也尝试使用重新分区功能,但我相信沿行和镶木地板文件的分区,我想沿列分区?

标签: parquetdask

解决方案


首先,我想评论一下,8712 列相当多,您会发现解析模式/元数据可能需要大量时间,更不用说数据加载了。

当 fastparquet 加载数据时,它首先分配一个足够大小的数据帧,然后遍历列/块(具有适当的开销,在这种情况下显然很小)并将值分配给分配的数据帧。

当您通过 Dask(任何计算)运行计算时,在许多情况下,输入变量和其他中间对象的内存中可能存在任务内副本。这通常不是问题,因为整个数据集应该被分成许多部分,而小型中间体的内存开销是能够处理比内存更大的数据集所付出的代价。我不确定您在什么时候获得副本,这可能值得调查和预防。

在您的情况下,整个数据集是一个分区。这将导致一个加载任务,在一个线程中运行。您将不会获得任何并行性,并且任何中间内部副本都适用于整个数据集。您可以通过选择列仅加载部分数据,并以此方式制造分区并实现并行性。然而,处理 parquet 数据的典型方法是使用“行组”分区(即沿索引)和多个文件,因此避免该问题的真正方法是使用已经适当分区的数据。

请注意,由于您可以使用 fastparquet/pandas 直接加载数据,因此您可能还可以使用to_parquet方法或 fastparquet 的write函数保存分区版本。


推荐阅读