python - 将镶木地板保存到 AWS s3 存储桶时如何优化当前的 pyspark 性能?
问题描述
在日常的管道中,我们需要对一个巨大的数据帧(约 2 亿行)执行一些计算密集型操作。输出必须保存在多个分区中:item_release_year, item_release_month, item_release_date, shop_id, execution_date
.
操作部分似乎工作正常(我知道还有很多可以改进的地方),但主要的瓶颈是当我保存所有文件时。
据我从 AWS EMR UI 了解到,计算操作完成需要 20 分钟,但整个保存拼花过程需要 2 小时。
如果我查看其中一个 parquet 步骤,它实际上表明该阶段仅 shuffle 读取 56MB 数据,这令人惊讶地花了将近 5 分钟!
这就是我们编写文件的方式:
repartition_by = [item_release_year, item_release_month, item_release_date, shop_id, execution_date]
df = df.repartition(10, *repartition_by)
df.write.mode("overwrite").partitionBy(repartition_by).parquet(output_path)
以下是火花配置:
spark = pyspark.sql.SparkSession.Builder().enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.driver.maxResultSize", "2g")
.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
有人可以给我一个提示问题可能出在哪里吗?我该如何改进它?谢谢你!
解决方案
推荐阅读
- c# - Google Cloud Vision API - 创建 Grpc.Core.Channel 时出错
- python - 我们如何在 Python 中的轮廓/边界之间绘制曲线?
- python - 使用 OpenPyxel 在 Excel 中插入多个图像不起作用
- python - tkinter 和 urllib 检查网站的问题
- python - 在 Django 中覆盖保存导致无限递归错误
- java - 如何访问字符串 json 有效负载并将其映射到 Spring 休息控制器中的对象?
- python - Google App Engine - 应用无法发送日志
- javascript - 合并快速排序时间结果异常
- unit-testing - 如何使用 Moq.Dapper 模拟 QueryMultiple
- docker - 如何从 kubernetes pod 访问 HDP 集群