首页 > 解决方案 > Dask 无法使用连接数据写入镶木地板

问题描述

我正在尝试执行以下操作:

使用 pandas 读取 .dat 文件,将其转换为 dask 数据帧,将其连接到我从 parquet 文件中读取的另一个 dask 数据帧,然后输出到新的 parquet 文件。我执行以下操作:

import dask.dataframe as dd
import pandas as pd

hist_pth = "\path\to\hist_file"
hist_file = dd.read_parquet(hist_pth)

pth = "\path\to\file"
daily_file = pd.read_csv(pth, sep="|", encoding="latin")
daily_file = daily_file.astype(hist_file.dtypes.to_dict(), errors="ignore")
dask_daily_file = dd.from_pandas(daily_file, npartitions=1)

combined_file = dd.concat([dask_daily_file, hist_file])

output_path = "\path\to\output"
combined_file.to_parquet(output_path)

总是启动然后combined_file.to_parquet(output_path)停止/或无法正常工作。在 jupyter 笔记本中,当我这样做时,我得到一个内核失败错误。当我在 python 脚本中执行此操作时,脚本完成但未写入整个组合文件(我知道是因为大小 - CSV 为 140MB,parquet 文件约为 1GB - 输出to_parquet仅为 20MB)。

在某些情况下,这是针对 ETL 流程的,并且每天都会添加大量数据,我很快就会耗尽历史数据集和组合数据集的内存,因此我正在尝试将流程从 pandas 迁移到 Dask 到处理我很快就会拥有的大于内存的数据。当前数据(每日 + 历史数据)仍然适合内存,但几乎没有(我已经使用了分类数据,这些数据存储在 parquet 文件中,然后我将该模式复制到新文件中)。

我还注意到,在那之后,即使是简单的任务dd.concat([dask_daily_file, hist_file]),我也无法调用.compute()它,而不会像写入镶木地板时那样崩溃。例如,在原始的、预先连接的数据上,我可以调用hist_file["Value"].div(100).compute()并获取预期值,但在 combine_file 上使用相同的方法会崩溃。甚至只是combined_file.compute()为了把它变成熊猫 df 崩溃。我也尝试过重新分区,但没有运气。

我能够在 pandas 中毫无问题地完成这些精确的操作。但同样,我很快就会耗尽内存,这就是为什么我要转向 dask。

这是 dask 无法处理的事情吗?如果它可以处理它,我是否正确处理它?具体来说,似乎 concat 正在引起问题。任何帮助表示赞赏!

更新

在玩了更多之后,我最终遇到了以下错误:

AttributeError: 'numpy.ndarray' object has no attribute 'categories'

现有的 GitHub 问题似乎与此有关 - 我询问并等待确认。

作为一种解决方法,我将所有分类列转换为字符串/对象并再次尝试,然后以

ArrowTypeError: ("Expected a bytes object, got a 'int' object, 'Conversion failed for column Account with type object')

当我检查该列时,df["Account"].dtype它会返回dtype('O'),所以我认为我已经有了正确的 dtype。此列中的值主要是数字,但也有一些记录只有字母。

有没有办法解决这个问题?

标签: pandasdaskparquet

解决方案


在连接数据帧并将结果保存为 Parquet 格式后,我在 Pandas 中遇到了这个错误。

data = pd.concat([df_1, d2, df3], axis=0, ignore_index=True)
data.to_parquet(filename)

..显然是因为行包含不同的数据类型,int 或 float。通过在保存之前强制它们具有相同的数据类型,错误就会消失

cols = ["first affected col", "second affected col", ..]
data[cols] = data[cols].apply(pd.to_numeric, errors='coerce', axis=1)

推荐阅读