首页 > 解决方案 > 如何在 Spark 的每个分区中求和

问题描述

我已经创建了类并使用该类来创建 RDD。我想计算每个分区的LoudnessRate(类成员)的总和。这个总和稍后将用于计算每个分区的平均LoudnessRate。我尝试了以下代码,但它不计算 Sum 并返回0.0。我的代码是

    object sparkBAT {
      def main(args: Array[String]): Unit = {
        val numPartitions = 3
        val N = 50
        val d = 5
        val MinVal = -10
        val MaxVal =  10
        val conf = new SparkConf().setMaster(locally("local")).setAppName("spark Sum")
        val sc = new SparkContext(conf)

        val ba = List.fill(N)(new BAT(d, MinVal, MaxVal))
        val rdd = sc.parallelize(ba, numPartitions)

        var arrSum =Array.fill(numPartitions)(0.0) // Declare Array that will hold sum for each Partition
        rdd.mapPartitionsWithIndex((k,iterator) => iterator.map(x => arrSum(k) += x.LoudnessRate)).collect()
        arrSum foreach println
      }
    }


    class BAT (dim:Int, min:Double, max:Double) extends Serializable {    
      val random = new Random()
      var position      : List[Double]      =   List.fill(dim) (random.nextDouble() * (max-min)+min )
      var velocity      :List[Double]       =   List.fill(dim)( math.random)
      var PulseRate     : Double            =   0.1
      var LoudnessRate  :Double             =   0.95
      var frequency     :Double             =   math.random
      var fitness       :Double             =   math.random
      var BestPosition  :List[Double]       =   List.fill(dim)(math.random)
      var BestFitness   :Double             =   math.random 
    }

标签: scalaapache-sparkrddpartitioning

解决方案


根据要求将我的评论更改为答案。原评论

您正在 executor JVM 中修改 arrSum 并在 dirver JVM 中打印其值。您可以将迭代器映射到单例迭代器并使用 collect 将值移动到驱动程序。此外,不要将 iterator.map 用于副作用,iterator.foreach 就是为此而设计的。

这是一个示例片段如何做到这一点。首先创建一个具有两个分区的 RDD,0 -> 1,2,3然后1 -> 4,5. 当然,您在实际代码中不需要这个,但随着sc.parallelize行为的变化取决于环境,这将始终创建统一的 RDD 来重现:

object DemoPartitioner extends Partitioner {
  override def numPartitions: Int = 2
  override def getPartition(key: Any): Int = key match {
    case num: Int => num
  }
}
val rdd = sc
  .parallelize(Seq((0, 1), (0, 2), (0, 3), (1, 4), (1, 5)))
  .partitionBy(DemoPartitioner)
  .map(_._2)

然后是实际的技巧:

val sumsByPartition = rdd.mapPartitionsWithIndex {
  case (partitionNum, it) => Iterator.single(partitionNum -> it.sum)
}.collect().toMap
println(sumsByPartition)

输出:

Map(0 -> 6, 1 -> 9)

推荐阅读