首页 > 解决方案 > 如何使用 Dask read_csv 读取每 n 行以快速读取多个文件?

问题描述

我正在尝试将多个 CSV 文件读入单个数据帧。虽然这可以使用列表理解和 Panda 的 concat 函数,例如

import pandas as pd
files = ['file1.csv', 'file2.csv', etc....]
all_df = []
for filename in files:
    all_df.append(pd.read_csv(filename))
df = pd.concat(all_df)

当文件是一个长列表(例如 100 个项目)时,我发现这太慢了。

我尝试使用 Dask,它接受列表作为输入并具有内置的并行化速度,例如

import dask.dataframe as dd
df_dask = dd.read_csv(files)
df = df_dask.compute()

这使速度提高了约 2 倍。

但是,为了进一步加快速度,我希望能够只读取文件的每 N 行。

使用 Pandas,我可以使用 lambda 函数和skiprowsread_csv 的参数来做到这一点。例如 cond = lambda x : x % downsampling != 0,在循环中,使用, pd.read_csv(filename, skiprows=cond).

但是,这不适用于 Dask,并且 skiprows 参数不接受 lambda 函数。我不能将整数传递给skiprows,因为每个文件都有不同的长度,所以每个文件要跳过的行不同。

有没有快速的解决方案?我认为某种与 Dask 兼容的下采样操作可能是一种解决方案,但不确定如何实现。

请问这可能吗?

标签: pythonpandasdataframebigdatadask

解决方案


详细说明@quizzical_panini 的使用建议dask.delayed

import dask
import pandas as pd

@dask.delayed
def custom_pandas_load(file_path):
     # do what you would do if you had one file
    cond = lambda x : x % downsampling != 0
    df = pd.read_csv(file_path, skiprows=cond)
    return df

[computed_dfs] = dask.compute(
    [custom_pandas_load(file_path)
     for file_path in files]
)

df_final = pd.concat(computed_dfs)

推荐阅读