apache-spark - Spark Streaming 由于与正在读取的 Kafka 主题不同的错误而失败
问题描述
对于以下写入主题/阅读主题air2008rand
串联:
import org.apache.spark.sql.streaming.Trigger
(spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "air2008rand")
.load()
.groupBy('value.cast("string").as('key))
.agg(count("*").cast("string") as 'value)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingOffsets", "earliest")
.option("includeTimestamp", true)
.option("topic","t1")
.trigger(Trigger.ProcessingTime("2 seconds"))
.outputMode("update")
.option("checkpointLocation","/tmp/cp")
.start)
由于不同的主题而产生错误air2008m1-0
:
scala> 19/07/14 13:27:22 ERROR MicroBatchExecution: Query [id = 711d44b2-3224-4493-8677-e5c8cc4f3db4, runId = 68a3519a-e9cf-4a82-9d96-99be833227c0]
terminated with error
java.lang.IllegalStateException: Set(air2008m1-0) are gone.
Some data may have been missed.
Some data may have been lost because they are not available in Kafka any more; either the
data was aged out by Kafka or the topic may have been deleted before all the data in the
topic was processed. If you don't want your streaming query to fail on such cases, set the
source option "failOnDataLoss" to "false".
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.org$apache$spark$sql$kafka010$KafkaMicroBatchReader$$reportDataLoss(KafkaMicroBatchReader.scala:261)
at org.apache.spark.sql.kafka010.KafkaMicroBatchReader.planInputPartitions(KafkaMicroBatchReader.scala:124)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)
通过停止读/写代码(在 spark-shell 中repl
)然后重新运行它,可以重复此行为。
为什么这里不同的kafka主题之间会出现“串扰”?
解决方案
问题是由于检查点目录包含来自早期火花流操作的数据。解决方法是更改检查点目录。
该解决方案在此问题 [IllegalStateException] 中作为评论(来自@jaceklaskowski 本人)找到:Spark Structured Streaming is terminate Streaming Query with Error
推荐阅读
- php - 具有多对多 laravel 关系的预订系统
- c++ - 逐块读取二进制文件并在位集中加载
- system-verilog - 与 UVM 中的多个定序器相同的序列
- c++ - 包括 CMake 项目库
- python - 在 Flask 中按下提交后我没有得到任何结果
- flutter - 跨集团共享状态
- python - 将 0 添加到 Project Euler 问题 23“Non-Abundant Sums”的列表中会改变答案
- c - 如何在 C 中编译一个简单的 MLT 示例?
- asp.net-core - DotNet Core 依赖注入多个项目
- html - 围绕可变高度的 div 画一个完美的圆圈