pandas - Dask 无法使用连接数据写入镶木地板
问题描述
我正在尝试执行以下操作:
使用 pandas 读取 .dat 文件,将其转换为 dask 数据帧,将其连接到我从 parquet 文件中读取的另一个 dask 数据帧,然后输出到新的 parquet 文件。我执行以下操作:
import dask.dataframe as dd
import pandas as pd
hist_pth = "\path\to\hist_file"
hist_file = dd.read_parquet(hist_pth)
pth = "\path\to\file"
daily_file = pd.read_csv(pth, sep="|", encoding="latin")
daily_file = daily_file.astype(hist_file.dtypes.to_dict(), errors="ignore")
dask_daily_file = dd.from_pandas(daily_file, npartitions=1)
combined_file = dd.concat([dask_daily_file, hist_file])
output_path = "\path\to\output"
combined_file.to_parquet(output_path)
总是启动然后combined_file.to_parquet(output_path)
停止/或无法正常工作。在 jupyter 笔记本中,当我这样做时,我得到一个内核失败错误。当我在 python 脚本中执行此操作时,脚本完成但未写入整个组合文件(我知道是因为大小 - CSV 为 140MB,parquet 文件约为 1GB - 输出to_parquet
仅为 20MB)。
在某些情况下,这是针对 ETL 流程的,并且每天都会添加大量数据,我很快就会耗尽历史数据集和组合数据集的内存,因此我正在尝试将流程从 pandas 迁移到 Dask 到处理我很快就会拥有的大于内存的数据。当前数据(每日 + 历史数据)仍然适合内存,但几乎没有(我已经使用了分类数据,这些数据存储在 parquet 文件中,然后我将该模式复制到新文件中)。
我还注意到,在那之后,即使是简单的任务dd.concat([dask_daily_file, hist_file])
,我也无法调用.compute()
它,而不会像写入镶木地板时那样崩溃。例如,在原始的、预先连接的数据上,我可以调用hist_file["Value"].div(100).compute()
并获取预期值,但在 combine_file 上使用相同的方法会崩溃。甚至只是combined_file.compute()
为了把它变成熊猫 df 崩溃。我也尝试过重新分区,但没有运气。
我能够在 pandas 中毫无问题地完成这些精确的操作。但同样,我很快就会耗尽内存,这就是为什么我要转向 dask。
这是 dask 无法处理的事情吗?如果它可以处理它,我是否正确处理它?具体来说,似乎 concat 正在引起问题。任何帮助表示赞赏!
更新
在玩了更多之后,我最终遇到了以下错误:
AttributeError: 'numpy.ndarray' object has no attribute 'categories'
现有的 GitHub 问题似乎与此有关 - 我询问并等待确认。
作为一种解决方法,我将所有分类列转换为字符串/对象并再次尝试,然后以
ArrowTypeError: ("Expected a bytes object, got a 'int' object, 'Conversion failed for column Account with type object')
当我检查该列时,df["Account"].dtype
它会返回dtype('O')
,所以我认为我已经有了正确的 dtype。此列中的值主要是数字,但也有一些记录只有字母。
有没有办法解决这个问题?
解决方案
在连接数据帧并将结果保存为 Parquet 格式后,我在 Pandas 中遇到了这个错误。
data = pd.concat([df_1, d2, df3], axis=0, ignore_index=True)
data.to_parquet(filename)
..显然是因为行包含不同的数据类型,int 或 float。通过在保存之前强制它们具有相同的数据类型,错误就会消失
cols = ["first affected col", "second affected col", ..]
data[cols] = data[cols].apply(pd.to_numeric, errors='coerce', axis=1)
推荐阅读
- sql-server - 我们可以看到 Azure SQL 数据库在内部用于存储 MDF/LDF 和备份文件的存储帐户吗?
- c# - C#9 不支持 JsonPropertyNameAttribute 记录?
- python - 如何在python中正确导入函数?
- c++ - 如何使用 C++ 将 XML 文件加载和读/写到 GTK+ 应用程序中?
- python - 美丽汤和 td 元素提取的问题
- machine-learning - PyTorch 由于广播而给出不正确的结果
- c# - Task.Delay 真的像 I/O 操作那样异步吗,即它是否依赖硬件和中断而不是线程?
- ruby-on-rails - Flash 消息仅显示一次(Rails 6)
- c++ - 连接一列中的所有行,其中另一列匹配/等于 sqlite 中的某些内容
- python - 如何将不同 csv 文件中的两列合并为一个 csv 文件