scala - 当偏移量存在间隙时,Kafka 结构化流应用程序抛出 IllegalStateException
问题描述
我有一个在 spark 2.3上与 Kafka 一起运行的结构化流应用程序,
“spark-sql-kafka-0-10_2.11”版本是2.3.0
应用程序开始读取消息并成功处理它们,然后在达到特定偏移量后(如异常消息所示),它会抛出以下异常:
java.lang.IllegalStateException: Tried to fetch 666 but the returned record offset was 665
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.org$apache$spark$sql$kafka010$InternalKafkaConsumer$$fetchData(KafkaDataConsumer.scala:297)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:163)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer$$anonfun$get$1.apply(KafkaDataConsumer.scala:147)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:109)
at org.apache.spark.sql.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:147)
at org.apache.spark.sql.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:54)
at org.apache.spark.sql.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:362)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:151)
at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.getNext(KafkaSourceRDD.scala:142)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:52)
at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:935)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2074)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
它总是在相同的偏移量上失败,看起来这是由于偏移量的差距,因为我在 Kafka UI 中看到偏移量 665 之后有 667(由于某种原因它跳过了 666),以及我的结构化流中的 Kafka 客户端应用程序尝试获取 666 并失败。
在深入研究 Spark 的代码后,我发现他们没想到会发生这种异常(根据评论):
所以我想知道,我做错了什么吗?或者这是我正在使用的特定版本的错误?
解决方案
Spark 中存在一个长期存在的问题,该问题在Spark 2.4 中得到修复,这在 Kafka 和 Spark 之间造成了一些阻抗不匹配。部分修复已向后移植到 Spark 2.3.1,但仅在配置选项spark.streaming.kafka.allowNonConsecutiveOffsets
设置为时启用true
;正如您所观察到的,您很可能遇到了未向后移植的东西,在这种情况下升级到 Spark 2.4 可能值得考虑。
推荐阅读
- css - 如何编辑 Firefox 阅读器视图的 CSS / 样式
- android - 文本大小 AppBar / NavBar 辅助功能
- java - 发送电子邮件后如何敬酒?
- spring - 选择带有弹簧数据的自引用表到新对象中
- react-native - 从导航标题返回屏幕时出错:未定义不是对象(评估“this.props.navigation.navigate”)
- python - Sympy 符号微分
- azure - PatchOrchestrationApplicationType 1.2.2 - FABRIC_E_FILE_NOT_FOUND:在商店中找不到应用程序清单文件“ApplicationManifest.xml”
- android - 与父底部对齐的视图被隐藏,同时在协调器布局中包含相对布局
- java - 无法将源从目标复制到包的目标
- c# - 剃刀生成器在剃刀视图中添加空注释行