parquet - 在单个多核机器上索引大型 dask 数据帧时的内存使用情况
问题描述
我正在尝试将Wikipedia CirrusSearch 转储转储为 Parquet 支持的 dask 数据帧,该数据帧由 450G 16 核 GCP 实例上的标题索引。CirrusSearch 转储以单个 json 行格式文件的形式出现。英文 Wipedia 转储包含 5M 记录,压缩为 12G,扩展为 90+G。一个重要的细节是记录并不完全平坦。
最简单的方法是
import json
import dask
from dask import bag as db, dataframe as ddf
from toolz import curried as tz
from toolz.curried import operator as op
blocksize=2**24
npartitions='auto'
parquetopts=dict(engine='fastparquet', object_encoding='json')
lang = 'en'
wiki = 'wiki'
date = 20180625
path='./'
source = f'{path}{lang}{wiki}-{date}-cirrussearch-content.json'
(
db
.read_text(source, blocksize=blocksize)
.map(json.loads)
.filter(tz.flip(op.contains, 'title'))
.to_dataframe()
.set_index('title', npartitions=npartitions)
.to_parquet(f'{lang}{wiki}-{date}-cirrussearch.pq', **parquetopts)
)
第一个问题是默认调度器只使用一个内核。这个问题可以通过明确使用分布式或多处理调度程序来避免。
我尝试过的所有调度程序和设置的更大问题是内存使用。似乎 dask 在索引时尝试将整个数据帧加载到内存中。即使是 450G 的 RAM 也不够用。
- 如何减少此任务的内存使用量?
- 如何在不进行反复试验的情况下估算所需的最小内存?
- 有更好的方法吗?
解决方案
为什么 Dask 只使用一个核心?
这其中的 JSON 解析部分可能是 GIL 绑定的,您想使用进程。但是,当您最终计算某些内容时,您正在使用数据帧,通常假设计算会释放 GIL(这在 Pandas 中很常见),因此默认情况下它使用线程后端。如果您主要受 GIL 解析阶段的约束,那么您可能想要使用多处理调度程序。这应该可以解决您的问题:
dask.config.set(scheduler='multiprocessing')
如何避免在 set_index 阶段使用内存
是的,set_index 计算需要完整的数据集。这是一个难题。如果您使用的是单机调度程序(您似乎正在这样做),那么它应该使用核外数据结构来执行此排序过程。我很惊讶它的内存不足。
如何在不进行反复试验的情况下估算所需的最小内存?
不幸的是,很难用任何语言估计内存中类似 JSON 的数据的大小。使用平面模式要容易得多。
有更好的方法吗?
这并不能解决您的核心问题,但您可能会考虑在尝试对所有内容进行排序之前以 Parquet 格式暂存数据。然后尝试dd.read_parquet(...).set_index(...).to_parquet(...)
孤立地做。这可能有助于隔离一些成本。
推荐阅读
- java - Spring Boot Oauth2 Apache Kafka - loadUserByUsername 与 Kafka 主题
- .net-core - 在 WebApi 中更新子实体的最佳做法是什么?
- verilog - 在verilog中替代“不能在函数声明上设置范围和类型”?
- c# - windows/amd64 Server 2016 没有匹配的清单
- r - 使用 SparkR 删除只有 NA 值的列
- javascript - 在没有 toString() JavaScript 方法的情况下构建十进制到十六进制转换器
- react-native - 为什么在我使用 propTypes onPress 后我的函数没有被调用
- rust - 如何解决导致错误的闭包问题[E0495]:无法推断出适当的生命周期
- passport.js - 护照与环回4集成?
- laravel - 如何在 github 工作流 CI/CD 中设置数据库服务容器?