scala - 使用 Spark collectionAccumulator 时出现 ConcurrentModificationException
问题描述
我正在尝试在 Azure HDInsight 按需群集上运行基于 Spark 的应用程序,并且看到许多 SparkExceptions(由 ConcurrentModificationExceptions 引起)被记录。当我启动本地 Spark 实例时,应用程序运行时没有这些错误。
我在使用累加器时看到过类似错误的报告,并且我的代码确实使用了 CollectionAccumulator,但是我在使用它的任何地方都放置了同步块,这没有什么区别。累加器相关的代码如下所示:
class MySparkClass(sc : SparkContext) {
val myAccumulator = sc.collectionAccumulator[MyRecord]
override def add(record: MyRecord) = {
synchronized {
myAccumulator.add(record)
}
}
override def endOfBatch() = {
synchronized {
myAccumulator.value.asScala.foreach((record: MyRecord) => {
processIt(record)
})
}
}
}
异常不会导致应用程序失败,但是当endOfBatch
被调用并且代码尝试从累加器中读取值时,它是空的并且processIt
永远不会被调用。
我们将HDInsight 3.6 版与 Spark 2.3.0版一起使用
18/11/26 11:04:37 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList.writeObject(ArrayList.java:770)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.rpc.netty.RequestMessage.serialize(NettyRpcEnv.scala:565)
at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:231)
at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:523)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:91)
... 13 more
以下代码是重现问题的更独立的示例。MyRecord
是一个仅包含数值的简单案例类。该代码在本地运行没有错误,但在 HDInsight 群集上会产生上述错误。
object MainDemo {
def main(args: Array[String]) {
val sparkContext = SparkSession.builder.master("local[4]").getOrCreate().sparkContext
val myAccumulator = sparkContext.collectionAccumulator[MyRecord]
sparkContext.binaryFiles("/my/files/here").foreach(_ => {
for(i <- 1 to 100000) {
val record = MyRecord(i, 0, 0)
myAccumulator.add(record)
}
})
myAccumulator.value.asScala.foreach((record: MyRecord) => {
// we expect this to be called once for each record that we 'add' above,
// but it is never called
println(record)
})
}
}
解决方案
我怀疑同步块是否真的有帮助。CustomeAccumulators 或所有其他累加器不是线程安全的。他们实际上不必这样做,因为 spark 驱动程序用于在任务完成(成功或失败)后更新累加器值的 DAGScheduler.updateAccumulators 方法仅在运行调度循环的单个线程上执行。除此之外,对于拥有自己的本地累加器引用的工作人员来说,它们是只写数据结构,而只有驱动程序才允许访问累加器的值。当你说它在本地模式下工作,因为它是单个 JVM,但在集群模式下,它们是不同的 JVM 和 java 实例,正在触发 PRC 调用以启用通信。
您的 MyRecord 对象的外观如何,如果您只是以 .value 结束您的行,而不是在其上使用迭代器,将会有所帮助。试试看嘛。
myAccumulator.value
推荐阅读
- sqlite - 使用查询从不同的表中获取信息
- android - 如何在 maxLine TextView 的最后一行应用水平渐变边缘
- python-3.x - Groupby并找出不同之处
- javascript - 5次调用后Ajax获取请求未得到响应
- curl - 通过 java 执行 AsyncHttpClient/curl 命令没有给出正确的结果
- python - 将日期范围拆分为更小的范围
- entity-framework - Entity Framework Core 的 AddRangeAsync() 在通用方法中不起作用
- kubernetes - 新的 macOS 安装 - kubectl 输出错误消息
- powershell - 正则表达式匹配不会产生预期的结果
- python - Python 爬虫仅在前几个 div 中找到所需的标签