首页 > 解决方案 > 为什么在减少分区数量时火花数据帧重新分区比合并更快?

问题描述

我有一个 df 有 100 个分区,在保存到 HDFS 为 .parquet 之前,我想减少分区数,因为 parquet 文件太小(<1MB)。我在写之前添加了合并:

df.coalesce(3).write.mode("append").parquet(OUTPUT_LOC)

它可以工作,但会将过程从每个文件 2-3 秒减慢到每个文件 10-20 秒。当我尝试重新分区时:

df.repartition(3).write.mode("append").parquet(OUTPUT_LOC)

这个过程一点也不慢,每个文件2-3s。

为什么?在减少分区数量时不应该总是更快地合并,因为它避免了完全洗牌?

背景:

我正在将文件从本地存储导入到 spark 集群,并将生成的数据帧保存为 parquet 文件。每个文件大约 100-200MB。文件位于“spark-driver”机器上,我在客户端部署模式下运行 spark-submit。我在驱动程序中一一读取文件:

data = read_lines(file_name)
rdd = sc.parallelize(data,100)
rdd2 = rdd.flatMap(lambda j: myfunc(j))
df = rdd2.toDF(mySchema)
df.repartition(3).write.mode("append").parquet(OUTPUT_LOC)

Spark 版本是 3.1.1

Spark/HDFS 集群有 5 个 8CPU、32GB RAM 的 worker

每个执行器有 4 个内核和 15GB 内存,总共有 10 个执行器。

编辑:

当我使用 coalesce(1) 时,我得到 spark.rpc.message.maxSize 超出限制错误,但在我使用 repartition(1) 时没有。这可能是一个线索吗?

附加 DAG 可视化 .. 看起来 WholeStageCodegen 部分在合并 DAG 上花费的时间太长?

合并

合并查询

重新分区

重新分区查询

标签: apache-sparkhdfs

解决方案


如果您的数据分布不均匀,有时会发生这种情况,并且当您合并时,它会尝试通过组合小分区来减少分区以减少完全洗牌,但其中一个分区和单个分区中仍然可能存在一些数据倾斜将花费大部分时间。

当您进行重新分区时,数据几乎均匀地分布在所有分区上,因为它会进行完全洗牌,并且所有任务几乎会同时完成。

您可以使用 spark UI 来查看为什么当您合并任务方面正在发生的事情时,您是否看到任何单个任务运行时间很长。


推荐阅读