scala - 如何为 Spark 中的任务分配执行者?
问题描述
我的文件如下:
(1,object1)
(2,object2)
(3,object3)
...
数字是对象的 ID。我想从这个文件中计算两个对象之间的距离。这意味着我想得到如下结果:
(1,1,0.0)
(1,2,1.2)
(1,3,1.3)
...
对于每一行,第一个元素是一个对象的id,第二个元素是另一个对象的id,最后一个元素是距离。所以如果原始文件中有3行,它将生成一个文件为9线。现在的问题是我有一个包含很多行的原始文件,大小非常大,所以我求助于 Spark 使用集群进行计算。所以我使用以下代码:(假设原始文件是file1.txt)
val fileRdd = sc.textFile("hdfs://../file1.txt")
val fileRdd2 = sc.textFile("hdfs://../file1.txt")
val objects = fileRdd.collect()
val objectsArr = spark.sparkContext.broadcast(objects)
val distanceRes = fileRdd2.flatMap(x=>{
distanceCal(x,objectsArr.value)
}).saveAsTextFile(targetFile)
在此代码段中,函数 distanceCal 接收一个对象和一个对象数组,然后返回一个 (objectID,objectID,distance) 数组。我在这里要做的是首先将对象广播给每个执行器,然后在 flatMap 函数中进行双循环。我使用yarn提交我的工作,配置如下:
--master yarn --num-executors 6 --executor-memory 8G --executor-cores 8 --conf spark.default.parallelism=144
我使用 6 个执行者来运行这项工作。我想计算会在很多任务中完成。但是,当涉及到 saveAsTextFile 阶段时,计算只在两个执行器上运行。在这种情况下,它只运行在 dn01 和 dn07 上,而其他执行器没有任务。
那么,有什么问题吗?非常感谢!
更具体地说,我尝试使用以下代码来增加分区,以便在最后阶段(saveAsTextFile)同时在不同的执行器上运行。但是,它仍然使用只有两个执行器,而另一个没有任务. 那么如何更改我的代码来实现这一点呢?也就是说,在最后阶段将有更多的执行程序用于计算,而不仅仅是我的示例中的两个。
fileRdd2.repartition(1000).flatMap(...)
解决方案
推荐阅读
- parsing - Rascal:用多个“_”解析字符串
- mysql - 显示一个表的结果并在另一列中查找
- java - Quartz Scheduler 远程实例的问题
- c - 编程 linux C 客户端-服务器。客户端未接收数据。本地主机
- android - Android 上的应用程序的 NDK 内存限制是多少?
- swift - 为什么 Singleton 会捕获自己的实例?
- javascript - 将道具解析为应在表格中呈现道具的组件时,道具为空
- regex - 在 ANT 正则表达式中出现错误“无法重命名临时文件 C:\Users\username\AppData\Local\Temp\replace7877977241325466040.txt”
- kubernetes - 使用 kubectl -o=jsonpath 获取就绪状态
- javascript - 将输入值传递给反应组件