首页 > 解决方案 > 在对 Dask 包中的 JSON 记录进行重复数据删除时,是否有比 .distinct() 更好的选择 - 可以减少内存使用?

问题描述

我一直在尝试处理每行包含 JSON 记录的几 GB 文本文件。在处理一些文件时,我发现了一些重复的记录。每条记录都有唯一ID(键名 UID),因此识别受骗者应该很容易。

我尝试将所有 JSON 导入 Dask 包,过滤到我感兴趣的键,然后运行 ​​.distinct()。链条看起来像这样:

def get_keys(record):
  return (r['UID'],r['DATETIME'],r['USER'],r['PAGEID'],r['ACTION'])

items = (db.read_text('/*.json')
          .map(json.loads)
          .map(get_keys)
          .distinct()
          .to_dataframe(columns=['UID','TIMESTAMP','USER_ID','PID','A'])
          .compute())

items.to_csv('deduped_items.csv')

我在笔记本电脑上运行它,所以是本地客户端。最终发生的是当内存使用率达到 95% 时工作人员会重新启动。在抛出异常之前似乎重试了几次。在查看 Dask 仪表板时,我看到 bag-from-delayed 的所有分区都已处理,大部分 distinct-part 已处理,然后无法通过 distinct-agg。

如果上面的代码块可以工作,还是我总是会遇到内存限制

- 编辑

在了解不同的关键参数后,我正在重新运行它。如果有帮助,将更新结果。

.distinct(key=lambda x:x[0])

-- 编辑 2

与键不同的也耗尽了内存。我现在要重新安排这个:

   .map(get_keys)
   .to_dataframe(columns=['UID','TIMESTAMP','USER_ID','PID','A'])
   .drop_duplicates()
   .compute())

现在正在运行,并且任务图看起来相同,但它似乎运行得更快。

-- 编辑 3

那也没有帮助。我开始相信相对于总记录数量的重复数量使这变得困难。粗略估计,大约有 5000 万条记录,猜测不到 10k 重复。

-- 编辑 4

我现在正在尝试将 UID 设置为索引并执行 map_partitions。我的理解是保证重复的 UID 存在于同一个分区中?

items = (db.read_text('/*.json')
          .map(json.loads)
          .map(get_keys)
          .to_dataframe(columns=['UID','TIMESTAMP','USER_ID','PID','A'])
          .set_index('UID')
          .map_partitions(lambda x: x.drop_duplicates)
          .compute())

标签: pythondask

解决方案


我相信 dask.bag 中的独特调用是相当幼稚的。它获取每个分区的唯一元素,然后将它们合并到一个分区中并获取这些不同的元素。仅当您希望结果适合内存时才合理使用。


推荐阅读