首页 > 解决方案 > 是否可以在 Spark 阶段重新排序任务

问题描述

我的问题是关于 Spark 阶段中的任务顺序。

语境:

我有一个 Spark 数据框,分为 3000 个分区。分区是在一个特定的键上完成的。我mapPartitionsWithIndex用来获取id一个分区和它包含的元素数量。例如:

df.rdd
  .mapPartitionsWithIndex((i,rows) => Iterator((i,rows.size)))
  .toDF("id", "numElements")

当 Spark 在我的数据帧上运行它的计算时,我在 Spark UI 中看到(我也做了一些测试以确保是这种情况)该任务index对应于 partition ,与上面获得id的完全相同。因此,任务是按照给定执行程序上分区增加的顺序执行的。idmapPartitionsWithIndexid

我看到分区中的行数与任务的执行时间之间存在明显的相关性。由于我的数据集具有无法更改的倾斜性质,我有几个分区的元素数量(> 8000)比平均数量(〜3000)要多得多。平均分区的执行时间为 10-20 分钟,较大的可以超过 3 小时。我的一些最大的分区很高id,因此相应的任务几乎在一个阶段结束时执行。因此,其中一个 Spark Stage 在最后 5 个任务上挂起 3 小时。

问题:

有没有办法重新排序id分区,以便首先执行来自最大分区的任务?或者等效地,有没有办法改变任务的执行顺序?

笔记:

我的设置:使用 Spark-submit 运行 Mesos 的 Spark 2.2。该作业在 60 个 CPU 上运行,有 12 个执行程序,每个执行程序有 5 个 CPU。

标签: scalaapache-sparkapache-spark-sqlpartitioning

解决方案


不,那里没有。如果是这样,它现在应该在文档中。

您无法控制任务的顺序(/优先级) - 因为 Spark 任务调度程序没有定义此类顺序/优先级的接口。

Spark 的工作方式与 Informatica 不同。一个阶段 - 因此所有任务 - 必须完全完成,然后才能为给定的动作开始下一个阶段。

8000似乎需要很长时间。


推荐阅读