apache-spark - 优化 Delta Lake 中的合并(Databricks 开源)
问题描述
我正在尝试使用 delta Lake oss 实现合并,我的历史数据大约有 70 亿条记录,而 delta 大约有 500 万条记录。
合并基于复合键(5 列)。
我正在启动一个 10 节点集群 r5d.12xlarge(~3TB 内存/~480 个核心)。
这项工作第一次需要 35 分钟,随后的运行需要更多时间。
尝试使用优化技术,但没有任何效果,并且在 3 次运行后我开始出现堆内存问题,我在数据洗牌时看到磁盘上有很多溢出,尝试使用合并键上的 order by 重写历史记录,提高了性能并完成了合并20 分钟,泄漏量约为 2TB,但问题是作为合并过程的一部分写入的数据顺序不同,因为我无法控制写入数据的顺序,因此后续运行需要更长的时间。
我无法在 delta Lake oss 中使用 Zorder,因为它仅附带订阅。我尝试了压缩,但这也无济于事。请让我知道是否有更好的方法来优化合并过程。
解决方案
如果你真的想通过代码来优化它,你可以启动并行任务。这是我们用来并行化 S3 编写的示例代码。您也可以对 adls 位置使用相同的逻辑。
with futures.ThreadPoolExecutor(max_workers=total_days+1) as e:
print(f"{raw_bucket}/{db}/{table}/")
for single_date in daterange(start_date, end_date):
curr_date = single_date.strftime("%Y-%m-%d")
jobs.append(e.submit(writeS3, curr_date))
for job in futures.as_completed(jobs):
result_done = job.result()
print(f"Job Completed - {result_done}")
print("Task complete")
参考:https ://docs.python.org/3/library/concurrent.futures.html
推荐阅读
- javascript - 使用 p5 库和 CreateCanvas 函数,如何在 div 类中创建画布?
- c# - 生成后如何将页码添加到 iText7 Pdf?
- python - 如何用换行符\n分割字符串
- python-3.x - 如何从头开始为简单的矩形结构生成网格点?
- tensorflow - 在训练图像分割之前无法预处理数据
- discord - 练级系统的排行榜命令?- discord.py 重写
- reactjs - 当 url 没有改变时,带有 React 的上一个按钮
- svg.js - 如何使用动画、定位和循环创建下雨效果
- python - 如何反转熊猫中特定数据框列的内容?
- spring - Spring Boot - 为什么我不能在 RestController 中生成字符串?