scala - 是否可以在 Spark 阶段重新排序任务
问题描述
我的问题是关于 Spark 阶段中的任务顺序。
语境:
我有一个 Spark 数据框,分为 3000 个分区。分区是在一个特定的键上完成的。我mapPartitionsWithIndex
用来获取id
一个分区和它包含的元素数量。例如:
df.rdd
.mapPartitionsWithIndex((i,rows) => Iterator((i,rows.size)))
.toDF("id", "numElements")
当 Spark 在我的数据帧上运行它的计算时,我在 Spark UI 中看到(我也做了一些测试以确保是这种情况)该任务index
对应于 partition ,与上面获得id
的完全相同。因此,任务是按照给定执行程序上分区增加的顺序执行的。id
mapPartitionsWithIndex
id
我看到分区中的行数与任务的执行时间之间存在明显的相关性。由于我的数据集具有无法更改的倾斜性质,我有几个分区的元素数量(> 8000)比平均数量(〜3000)要多得多。平均分区的执行时间为 10-20 分钟,较大的可以超过 3 小时。我的一些最大的分区很高id
,因此相应的任务几乎在一个阶段结束时执行。因此,其中一个 Spark Stage 在最后 5 个任务上挂起 3 小时。
问题:
有没有办法重新排序id
分区,以便首先执行来自最大分区的任务?或者等效地,有没有办法改变任务的执行顺序?
笔记:
- 我不需要将分区移动到其他节点或执行程序,只需更改执行顺序即可。
- 我无法更改分区键
- 我可以更改分区数,但问题会一直存在
我的设置:使用 Spark-submit 运行 Mesos 的 Spark 2.2。该作业在 60 个 CPU 上运行,有 12 个执行程序,每个执行程序有 5 个 CPU。
解决方案
不,那里没有。如果是这样,它现在应该在文档中。
您无法控制任务的顺序(/优先级) - 因为 Spark 任务调度程序没有定义此类顺序/优先级的接口。
Spark 的工作方式与 Informatica 不同。一个阶段 - 因此所有任务 - 必须完全完成,然后才能为给定的动作开始下一个阶段。
8000似乎需要很长时间。
推荐阅读
- tomcat - 如果 docBase myapp.war 文件不可用,是否可以运行以前部署的 /webapp/myapp?
- julia - 如何在 Julia 中跨行查找最小非零元素的列的索引?
- heroku - 在heroku上运行时模板不存在但在本地运行时存在
- html - 仅使用 CSS 进行图像交换,并在交换后在图像上具有链接
- kubernetes - 运行 istio-proxy 后启动容器/pod
- c# - 引用异步任务而不启动它
- java - org.hibernate.AnnotationException: mappedBy 引用了一个未知的目标实体属性。出现错误:java.lang.NullPointerException
- python-3.x - 使用正则表达式在字符串中查找数字
- javascript - 在会话中强制布局保存缩放比例:d3.event.scale
- windows - bat Windows 中具有不同 Magick 命令的嵌套循环