首页 > 解决方案 > spark AccumulatorV2 的问题。我是 apache spark 的新手,所以我不知道为什么会抛出这个问题

问题描述

代码张贴在这里

package sparkcore.test

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object testAccumulator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[1]").setAppName("sparkAcc")
    val sc = new SparkContext(conf)
    val rdd = sc.makeRDD(ArrayBuffer(ArrayBuffer[Long](1L, 2L, 3L, 4L, 5L), ArrayBuffer[Long](1L, 2L, 3L, 4L, 5L)))

    var acc = new NotNullField
    sc.register(acc)

    rdd.foreach(
      text =>{
        acc.add(text)
      }
    )
    println(acc.value)
  }

  class NotNullField extends AccumulatorV2[ArrayBuffer[Long], ArrayBuffer[Long]] {
    val arrayBuffer = ArrayBuffer[Long](0L, 0L, 0L, 0L, 0L)
    override def isZero: Boolean = {
      var flag = true
      for(i <- 0 to arrayBuffer.length-1 if flag){
        if (arrayBuffer(i) != 0) flag = false
      }
      flag
    }

    override def copy(): AccumulatorV2[ArrayBuffer[Long], ArrayBuffer[Long]] = {
      new NotNullField
    }

    override def reset(): Unit = {
      arrayBuffer.clear()
    }

    override def add(v: ArrayBuffer[Long]): Unit = {
      for(i <- 0 to v.length - 1){
        if(v(i) != 0) arrayBuffer(i) += v(i)
      }
    }

    override def merge(other: AccumulatorV2[ArrayBuffer[Long], ArrayBuffer[Long]]): Unit = {
      other match {
        case o: NotNullField =>{
          for(i <- 0 to arrayBuffer.length -1){
line 45:error here   arrayBuffer(i) += o.arrayBuffer(i)
          }
        }
        case _  =>
      }
    }

    override def value: ArrayBuffer[Long] = {
      arrayBuffer
    }
  }
}

错误很简单

Caused by: java.lang.IndexOutOfBoundsException: 0
    at scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
    at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:48)
    at sparkcore.test.testAccumulator$NotNullField$$anonfun$add$1.apply$mcVI$sp(testAccumulator.scala:45)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
    at sparkcore.test.testAccumulator$NotNullField.add(testAccumulator.scala:44)
    at sparkcore.test.testAccumulator$$anonfun$main$1.apply(testAccumulator.scala:19)
    at sparkcore.test.testAccumulator$$anonfun$main$1.apply(testAccumulator.scala:18)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

在此处输入图像描述 在此处输入图像描述 在此处输入图像描述第 45 行 arrayBuffer 为空,所以它抛出 IndexOutOfBoundsException,我在代码中使用 Array 而不是 ArrayBuffer,它可以工作,为什么?我知道Accumulator的原理,ArrayBuffer被复制到Executor,所以它不应该是空的。

标签: apache-spark

解决方案


推荐阅读