python - 在对 Dask 包中的 JSON 记录进行重复数据删除时,是否有比 .distinct() 更好的选择 - 可以减少内存使用?
问题描述
我一直在尝试处理每行包含 JSON 记录的几 GB 文本文件。在处理一些文件时,我发现了一些重复的记录。每条记录都有唯一的ID(键名 UID),因此识别受骗者应该很容易。
我尝试将所有 JSON 导入 Dask 包,过滤到我感兴趣的键,然后运行 .distinct()。链条看起来像这样:
def get_keys(record):
return (r['UID'],r['DATETIME'],r['USER'],r['PAGEID'],r['ACTION'])
items = (db.read_text('/*.json')
.map(json.loads)
.map(get_keys)
.distinct()
.to_dataframe(columns=['UID','TIMESTAMP','USER_ID','PID','A'])
.compute())
items.to_csv('deduped_items.csv')
我在笔记本电脑上运行它,所以是本地客户端。最终发生的是当内存使用率达到 95% 时工作人员会重新启动。在抛出异常之前似乎重试了几次。在查看 Dask 仪表板时,我看到 bag-from-delayed 的所有分区都已处理,大部分 distinct-part 已处理,然后无法通过 distinct-agg。
如果上面的代码块可以工作,还是我总是会遇到内存限制
- 编辑
在了解不同的关键参数后,我正在重新运行它。如果有帮助,将更新结果。
.distinct(key=lambda x:x[0])
-- 编辑 2
与键不同的也耗尽了内存。我现在要重新安排这个:
.map(get_keys)
.to_dataframe(columns=['UID','TIMESTAMP','USER_ID','PID','A'])
.drop_duplicates()
.compute())
现在正在运行,并且任务图看起来相同,但它似乎运行得更快。
-- 编辑 3
那也没有帮助。我开始相信相对于总记录数量的重复数量使这变得困难。粗略估计,大约有 5000 万条记录,猜测不到 10k 重复。
-- 编辑 4
我现在正在尝试将 UID 设置为索引并执行 map_partitions。我的理解是保证重复的 UID 存在于同一个分区中?
items = (db.read_text('/*.json')
.map(json.loads)
.map(get_keys)
.to_dataframe(columns=['UID','TIMESTAMP','USER_ID','PID','A'])
.set_index('UID')
.map_partitions(lambda x: x.drop_duplicates)
.compute())
解决方案
我相信 dask.bag 中的独特调用是相当幼稚的。它获取每个分区的唯一元素,然后将它们合并到一个分区中并获取这些不同的元素。仅当您希望结果适合内存时才合理使用。