首页 > 解决方案 > 如何为 Spark 中的任务分配执行者?

问题描述

我的文件如下:

(1,object1)
(2,object2)
(3,object3)
...

数字是对象的 ID。我想从这个文件中计算两个对象之间的距离。这意味着我想得到如下结果:

(1,1,0.0)
(1,2,1.2)
(1,3,1.3)
...

对于每一行,第一个元素是一个对象的id,第二个元素是另一个对象的id,最后一个元素是距离。所以如果原始文件中有3行,它将生成一个文件为9线。现在的问题是我有一个包含很多行的原始文件,大小非常大,所以我求助于 Spark 使用集群进行计算。所以我使用以下代码:(假设原始文件是file1.txt)

val fileRdd = sc.textFile("hdfs://../file1.txt")
val fileRdd2 = sc.textFile("hdfs://../file1.txt")
val objects = fileRdd.collect()
val objectsArr = spark.sparkContext.broadcast(objects)
val distanceRes = fileRdd2.flatMap(x=>{
        distanceCal(x,objectsArr.value)
    }).saveAsTextFile(targetFile)

在此代码段中,函数 distanceCal 接收一个对象和一个对象数组,然后返回一个 (objectID,objectID,distance) 数组。我在这里要做的是首先将对象广播给每个执行器,然后在 flatMap 函数中进行双循环。我使用yarn提交我的工作,配置如下:

--master yarn --num-executors 6 --executor-memory 8G --executor-cores 8 --conf spark.default.parallelism=144

我使用 6 个执行者来运行这项工作。我想计算会在很多任务中完成。但是,当涉及到 saveAsTextFile 阶段时,计算只在两个执行器上运行。在这种情况下,它只运行在 dn01 和 dn07 上,而其他执行器没有任务。 在此处输入图像描述

那么,有什么问题吗?非常感谢!

更具体地说,我尝试使用以下代码来增加分区,以便在最后阶段(saveAsTextFile)同时在不同的执行器上运行。但是,它仍然使用只有两个执行器,而另一个没有任务. 那么如何更改我的代码来实现这一点呢?也就是说,在最后阶段将有更多的执行程序用于计算,而不仅仅是我的示例中的两个。

fileRdd2.repartition(1000).flatMap(...)

标签: scalaapache-sparkbroadcast

解决方案


推荐阅读