python - Pandas / Dask - 分组和聚合一个大的 CSV 会破坏内存和/或需要相当长的时间
问题描述
我正在尝试一个小型 POC 来尝试分组和聚合以减少 pandas 和 Dask 中大型 CSV 中的数据,并且我观察到内存使用率高和/或比我预期的处理时间慢......有人有吗python/pandas/dask noob 有什么技巧可以改善这一点吗?
背景
我有一个构建文件摄取工具的请求,该工具将:
- 能够接收几 GB 的文件,其中每行包含用户 ID 和其他一些信息
- 做一些转换
- 将数据减少到
{ user -> [collection of info]}
- 将这些数据批量发送到我们的网络服务
根据我的研究,由于文件只有几 GB,我发现 Spark 等会过大,而 Pandas/Dask 可能很适合,因此 POC。
问题
- pandas 和 Dask处理1GB csv需要大约 1 分钟,pandas 消耗1.5GB ram, dask 消耗9GB ram (!!!)
- 处理一个2GB 的 csv需要大约 3 分钟和 2.8GB 的内存来处理 pandas,Dask 崩溃了!
我在这里做错了什么?
- 对于熊猫,因为我正在处理小块的 CSV,所以我没想到 RAM 使用率会这么高
- 对于 Dask,我在网上阅读的所有内容都表明 Dask 以 指示的块处理 CSV,
blocksize
因此 ram 使用量应该是blocksize * size per block
,但是当块大小仅为 6.4MB 时,我不希望总计为 9GB。我不知道为什么对于1GB csv输入,它的 ram 使用量飙升至9GB
(注意:如果我不设置块大小,即使在输入 1GB 时也会崩溃)
我的代码
我无法共享 CSV,但它有 1 个整数列,后跟 8 个文本列。user_id
下面引用的和列都是order_id
文本列。
- 1GB csv 有 14000001 行
- 2GB csv 有 28000001 行
- 5GB csv 有 70000001 行
我用随机数据生成了这些 csv,user_id
我从 10 个预先随机生成的值中随机选择了列,所以我希望最终输出是 10 个用户 ID,每个用户 ID 都有一个谁知道有多少订单 ID 的集合。
熊猫
#!/usr/bin/env python3
from pandas import DataFrame, read_csv
import pandas as pd
import sys
test_csv_location = '1gb.csv'
chunk_size = 100000
pieces = list()
for chunk in pd.read_csv(test_csv_location, chunksize=chunk_size, delimiter='|', iterator=True):
df = chunk.groupby('user_id')['order_id'].agg(size= len,list= lambda x: list(x))
pieces.append(df)
final = pd.concat(pieces).groupby('user_id')['list'].agg(size= len,list=sum)
final.to_csv('pandastest.csv', index=False)
达斯克
#!/usr/bin/env python3
from dask.distributed import Client
import dask.dataframe as ddf
import sys
test_csv_location = '1gb.csv'
df = ddf.read_csv(test_csv_location, blocksize=6400000, delimiter='|')
# For each user, reduce to a list of order ids
grouped = df.groupby('user_id')
collection = grouped['order_id'].apply(list, meta=('order_id', 'f8'))
collection.to_csv('./dasktest.csv', single_file=True)
解决方案
该groupby
操作很昂贵,因为dask
将尝试在工作人员之间打乱数据以检查谁拥有哪些user_id
值。如果user_id
有很多独特的值(听起来像),则需要在工作人员/分区之间进行大量交叉检查。
至少有两种方法可以摆脱它:
- 设置
user_id
为索引。这在索引阶段会很昂贵,但后续操作会更快,因为现在 dask 不必检查每个分区的值user_id
。
df = df.set_index('user_id')
collection = df.groupby('user_id')['order_id'].apply(list, meta=('order_id', 'f8'))
collection.to_csv('./dasktest.csv', single_file=True)
- 如果您的文件具有您知道的结构,例如,作为一个极端示例,如果
user_id
进行了某种排序,则首先 csv 文件仅包含user_id
以 1 开头的值(或 A,或使用任何其他符号),然后是 2等,然后您可以使用该信息以groupby
仅在这些“块”中需要的方式在“块”(松散术语)中形成分区。
推荐阅读
- javascript - 根据使用 Google Sheets Apps 脚本在未知行数内过滤关键字移动特定行
- python - 如何绘制点以显示点网格?
- amazon-web-services - CloudFormation:Fn::GetAtt:不适用于 AWS::RDS::DBInstance 上的 Endpoint.Address
- r - 用两种不同的颜色填充 stat_ecdf
- sql-server - SQL Server 2019 Polybase 错误 - 无法为 SQL Server 2019 创建外部数据源
- python-3.x - 如何使用python获取上传到sharepoint的文件的动态url?
- python - Python - curve_fit 应用于缺少数据的表
- javascript - 使用 vanilla JavaScript 模拟选项卡并输入密钥
- python - Flask + gunicorn 代理设置?
- c# - 如何在 windows 10 中从 TwinCAT 获取符号?