scala - 如何在 Spark 的每个分区中求和
问题描述
我已经创建了类并使用该类来创建 RDD。我想计算每个分区的LoudnessRate(类成员)的总和。这个总和稍后将用于计算每个分区的平均LoudnessRate。我尝试了以下代码,但它不计算 Sum 并返回0.0。我的代码是
object sparkBAT {
def main(args: Array[String]): Unit = {
val numPartitions = 3
val N = 50
val d = 5
val MinVal = -10
val MaxVal = 10
val conf = new SparkConf().setMaster(locally("local")).setAppName("spark Sum")
val sc = new SparkContext(conf)
val ba = List.fill(N)(new BAT(d, MinVal, MaxVal))
val rdd = sc.parallelize(ba, numPartitions)
var arrSum =Array.fill(numPartitions)(0.0) // Declare Array that will hold sum for each Partition
rdd.mapPartitionsWithIndex((k,iterator) => iterator.map(x => arrSum(k) += x.LoudnessRate)).collect()
arrSum foreach println
}
}
class BAT (dim:Int, min:Double, max:Double) extends Serializable {
val random = new Random()
var position : List[Double] = List.fill(dim) (random.nextDouble() * (max-min)+min )
var velocity :List[Double] = List.fill(dim)( math.random)
var PulseRate : Double = 0.1
var LoudnessRate :Double = 0.95
var frequency :Double = math.random
var fitness :Double = math.random
var BestPosition :List[Double] = List.fill(dim)(math.random)
var BestFitness :Double = math.random
}
解决方案
根据要求将我的评论更改为答案。原评论
您正在 executor JVM 中修改 arrSum 并在 dirver JVM 中打印其值。您可以将迭代器映射到单例迭代器并使用 collect 将值移动到驱动程序。此外,不要将 iterator.map 用于副作用,iterator.foreach 就是为此而设计的。
这是一个示例片段如何做到这一点。首先创建一个具有两个分区的 RDD,0 -> 1,2,3
然后1 -> 4,5
. 当然,您在实际代码中不需要这个,但随着sc.parallelize
行为的变化取决于环境,这将始终创建统一的 RDD 来重现:
object DemoPartitioner extends Partitioner {
override def numPartitions: Int = 2
override def getPartition(key: Any): Int = key match {
case num: Int => num
}
}
val rdd = sc
.parallelize(Seq((0, 1), (0, 2), (0, 3), (1, 4), (1, 5)))
.partitionBy(DemoPartitioner)
.map(_._2)
然后是实际的技巧:
val sumsByPartition = rdd.mapPartitionsWithIndex {
case (partitionNum, it) => Iterator.single(partitionNum -> it.sum)
}.collect().toMap
println(sumsByPartition)
输出:
Map(0 -> 6, 1 -> 9)
推荐阅读
- react-native - 无法在设备/模拟器上运行 react-native-cli starter android 应用程序
- angular - 来自 Github 的 Angular 模块,如何集成到我的模块中?
- php - 为什么将日期转换为格式时出错?
- windows - 自动化 Windows 10 中的默认应用程序
- python - “将 fasttext_pybind 导入为 fasttext”获取 ImportError:generic_type:类型“args”已注册
- r - 如何在具有不同轴大小的 R 中绘图时激活“保持打开”?
- python - 从 lambda 的 API 网关重定向到 IAM Auth API
- python - 新人开户时如何默认显示余额为零?
- c++ - 什么是真正的分享?
- python - 遍历两个列表中的元素