python - 如何使用 Dask read_csv 读取每 n 行以快速读取多个文件?
问题描述
我正在尝试将多个 CSV 文件读入单个数据帧。虽然这可以使用列表理解和 Panda 的 concat 函数,例如
import pandas as pd
files = ['file1.csv', 'file2.csv', etc....]
all_df = []
for filename in files:
all_df.append(pd.read_csv(filename))
df = pd.concat(all_df)
当文件是一个长列表(例如 100 个项目)时,我发现这太慢了。
我尝试使用 Dask,它接受列表作为输入并具有内置的并行化速度,例如
import dask.dataframe as dd
df_dask = dd.read_csv(files)
df = df_dask.compute()
这使速度提高了约 2 倍。
但是,为了进一步加快速度,我希望能够只读取文件的每 N 行。
使用 Pandas,我可以使用 lambda 函数和skiprows
read_csv 的参数来做到这一点。例如
cond = lambda x : x % downsampling != 0
,在循环中,使用, pd.read_csv(filename, skiprows=cond)
.
但是,这不适用于 Dask,并且 skiprows 参数不接受 lambda 函数。我不能将整数传递给skiprows,因为每个文件都有不同的长度,所以每个文件要跳过的行不同。
有没有快速的解决方案?我认为某种与 Dask 兼容的下采样操作可能是一种解决方案,但不确定如何实现。
请问这可能吗?
解决方案
详细说明@quizzical_panini 的使用建议dask.delayed
:
import dask
import pandas as pd
@dask.delayed
def custom_pandas_load(file_path):
# do what you would do if you had one file
cond = lambda x : x % downsampling != 0
df = pd.read_csv(file_path, skiprows=cond)
return df
[computed_dfs] = dask.compute(
[custom_pandas_load(file_path)
for file_path in files]
)
df_final = pd.concat(computed_dfs)
推荐阅读
- svelte - SvelteKit 中应用程序特定的编译时间设置
- github - github如何在容器中克隆组织的私有存储库
- javascript - 反应内联样式 - 用居中文本替换 tbody
- kubernetes - 即使任务完成,通过 kubernetes 入口的 API 发布调用也永远不会结束,但是当 API 调用针对特定容器时,作业调用结束
- sql - 默认 ROW_NUMBER 为一个产品
- reactjs - 在 redux 工具包中存储循环数据
- python - Pandas 获取通过 ID 创建的所有组
- android - Android:检查选定的数据库 .db 文件是否有效
- blogger - 在 Blogger Notable 上,而不是 Snippet,将阅读更多内容放在跳转链接中
- react-native - ListView React Native 中未显示的项目