apache-spark - spark AccumulatorV2 的问题。我是 apache spark 的新手,所以我不知道为什么会抛出这个问题
问题描述
代码张贴在这里
package sparkcore.test
import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
object testAccumulator {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[1]").setAppName("sparkAcc")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(ArrayBuffer(ArrayBuffer[Long](1L, 2L, 3L, 4L, 5L), ArrayBuffer[Long](1L, 2L, 3L, 4L, 5L)))
var acc = new NotNullField
sc.register(acc)
rdd.foreach(
text =>{
acc.add(text)
}
)
println(acc.value)
}
class NotNullField extends AccumulatorV2[ArrayBuffer[Long], ArrayBuffer[Long]] {
val arrayBuffer = ArrayBuffer[Long](0L, 0L, 0L, 0L, 0L)
override def isZero: Boolean = {
var flag = true
for(i <- 0 to arrayBuffer.length-1 if flag){
if (arrayBuffer(i) != 0) flag = false
}
flag
}
override def copy(): AccumulatorV2[ArrayBuffer[Long], ArrayBuffer[Long]] = {
new NotNullField
}
override def reset(): Unit = {
arrayBuffer.clear()
}
override def add(v: ArrayBuffer[Long]): Unit = {
for(i <- 0 to v.length - 1){
if(v(i) != 0) arrayBuffer(i) += v(i)
}
}
override def merge(other: AccumulatorV2[ArrayBuffer[Long], ArrayBuffer[Long]]): Unit = {
other match {
case o: NotNullField =>{
for(i <- 0 to arrayBuffer.length -1){
line 45:error here arrayBuffer(i) += o.arrayBuffer(i)
}
}
case _ =>
}
}
override def value: ArrayBuffer[Long] = {
arrayBuffer
}
}
}
错误很简单
Caused by: java.lang.IndexOutOfBoundsException: 0
at scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)
at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:48)
at sparkcore.test.testAccumulator$NotNullField$$anonfun$add$1.apply$mcVI$sp(testAccumulator.scala:45)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at sparkcore.test.testAccumulator$NotNullField.add(testAccumulator.scala:44)
at sparkcore.test.testAccumulator$$anonfun$main$1.apply(testAccumulator.scala:19)
at sparkcore.test.testAccumulator$$anonfun$main$1.apply(testAccumulator.scala:18)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1954)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:325)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
在此处输入图像描述 在此处输入图像描述 在此处输入图像描述第 45 行 arrayBuffer 为空,所以它抛出 IndexOutOfBoundsException,我在代码中使用 Array 而不是 ArrayBuffer,它可以工作,为什么?我知道Accumulator的原理,ArrayBuffer被复制到Executor,所以它不应该是空的。
解决方案
推荐阅读
- c++ - 我可以确定右值引用的类型吗?
- c - tolower() c 的双重输出
- php - Laravel - 如何使用 Azure AD SLO 全局注销?
- python - 使用 contourf() 更改颜色栏中颜色区域的高度
- python - 对另一个数据框中存在的值的列求和
- java - 如何使用自定义对象将 int 和 double 连接到字符串并返回另一个类中的值
- c# - DataGridView 添加来自不同类的行
- arrays - 在numpy中沿多维索引排序
- python - 获取Python中DataFrame列中值的频率
- python - 如何检查网站帖子标题中是否包含特定单词?