python - python dask to_parquet 占用大量内存
问题描述
我正在使用带有 dask 的 python 3 来读取 parquet 文件列表,进行一些处理,然后将其全部放入一个新的联合 parquet 文件中以供以后使用。
该进程使用了太多内存,以至于它似乎在将所有 parquet 文件写入新 parquet 文件之前尝试将它们读入内存。
我正在使用以下代码
def t(path):
import dask.dataframe as dd
ddf = dd.read_parquet(path)
ddf["file"] = path
return ddf
b = bag.from_sequence(parquet_files)
with ProgressBar():
data = b.map(lambda x: t(x)).\
map(lambda y: dd.to_parquet(y, output_parquet_file, partition_on=["file"], append=True, engine="fastparquet")).\
compute(num_workers=1)
每次使用一个工作人员时,尤其是使用更多工作人员时,内存都会爆炸。这些文件很大(每个大约 1G),我试图从 csv 文件中读取信息并将它们分成 25M 块,并遇到了同样的问题。
我在这里想念什么?当迭代过程似乎在这里做正确的事情时,为什么它会尝试将所有内容加载到内存中?我怎样才能使用 dask 操作来做到这一点,而不会炸毁我在那台机器上的 128G 内存?
PS我尝试使用pyarrow引擎,但问题是附加尚未在dask中实现。
编辑:尝试了建议的解决方案:我现在试试这个代码
import dask.dataframe as dd
with ProgressBar():
dfs = [dd.read_parquet(pfile) for pfile in parquet_files]
for i, path in enumerate(parquet_files):
dfs[i]["file"] = path
df = dd.concat(dfs)
df.to_parquet(output_parquet_file)
尽管如此,内存还是会爆炸(在内存超过 200G 的系统上)
解决方案
在另一个集合的地图中使用 dask 集合方法很奇怪。您可以bag.map
像这样使用并直接调用 fastaprquet 函数,或者更好(取决于您需要执行的处理),对所有内容使用数据帧 API:
dfs = [dd.read_parquet(pfile, ...) for pfile in parquet_files]
df = dd.concat(dfs)
df.to_parquet(...)
请注意,尽管您尝试附加到单个文件(我认为),但 parquet 格式并没有真正从中受益,您也可以让 Dask 每个分区写入一个文件。
推荐阅读
- arrays - 通过在Swift中混合元素将多个数组合并为一个?
- python - 在 jupyter notebook 上安装 pyautogui 时如何修复错误?
- csv - 为带有文本字段的 csv 文件选择一个好的分隔符
- sql - 帽子是以下 SQL 查询的 LINQ 等价物吗?
- c# - 使用 Dapper 多次执行存储过程?
- python - 如何阅读 PyQt 中的所有 pdf 页面?
- airflow-scheduler - 气流不允许我将 try_number 宏转换为整数
- php - 输入复选框togle
- python-3.x - 根据另一个数据框中的值查找熊猫数据框中的间隔
- c - C中的类型声明问题(斐波那契数)