apache-spark - 为什么在减少分区数量时火花数据帧重新分区比合并更快?
问题描述
我有一个 df 有 100 个分区,在保存到 HDFS 为 .parquet 之前,我想减少分区数,因为 parquet 文件太小(<1MB)。我在写之前添加了合并:
df.coalesce(3).write.mode("append").parquet(OUTPUT_LOC)
它可以工作,但会将过程从每个文件 2-3 秒减慢到每个文件 10-20 秒。当我尝试重新分区时:
df.repartition(3).write.mode("append").parquet(OUTPUT_LOC)
这个过程一点也不慢,每个文件2-3s。
为什么?在减少分区数量时不应该总是更快地合并,因为它避免了完全洗牌?
背景:
我正在将文件从本地存储导入到 spark 集群,并将生成的数据帧保存为 parquet 文件。每个文件大约 100-200MB。文件位于“spark-driver”机器上,我在客户端部署模式下运行 spark-submit。我在驱动程序中一一读取文件:
data = read_lines(file_name)
rdd = sc.parallelize(data,100)
rdd2 = rdd.flatMap(lambda j: myfunc(j))
df = rdd2.toDF(mySchema)
df.repartition(3).write.mode("append").parquet(OUTPUT_LOC)
Spark 版本是 3.1.1
Spark/HDFS 集群有 5 个 8CPU、32GB RAM 的 worker
每个执行器有 4 个内核和 15GB 内存,总共有 10 个执行器。
编辑:
当我使用 coalesce(1) 时,我得到 spark.rpc.message.maxSize 超出限制错误,但在我使用 repartition(1) 时没有。这可能是一个线索吗?
附加 DAG 可视化 .. 看起来 WholeStageCodegen 部分在合并 DAG 上花费的时间太长?
解决方案
如果您的数据分布不均匀,有时会发生这种情况,并且当您合并时,它会尝试通过组合小分区来减少分区以减少完全洗牌,但其中一个分区和单个分区中仍然可能存在一些数据倾斜将花费大部分时间。
当您进行重新分区时,数据几乎均匀地分布在所有分区上,因为它会进行完全洗牌,并且所有任务几乎会同时完成。
您可以使用 spark UI 来查看为什么当您合并任务方面正在发生的事情时,您是否看到任何单个任务运行时间很长。
推荐阅读
- python - 使用 Python 的 Telnetlib 记录 NMEA 句子并将输出重定向到文本文件
- java - java SSLHandshakeException 虽然证书受信任的系统范围
- 3d - Zbrusch Core - 无法雕刻到圆柱体上
- html - 这个定制的引导代码是否需要我缺少的东西?
- javascript - 多动作 API 最佳实践/设计模式
- laravel - 模型的关系并向用户显示余额
- javascript - 如何将流媒体服务价格数组转换为按服务长度分类的对象数组?
- php - 如果用户有名字,如何显示某些内容,如果没有,如何显示其他内容
- javascript - 不使用 getElementById 的多个元素 onclick 的 JS 函数
- javascript - 生成 PDF 并且 PDF 存储在生成它的位置