apache-spark - Spark Streaming 无法读取 Kafka 偏移量
问题描述
作为概述,我正在使用 Kafka 和 Spark Streaming 运行一些测试(在我的本地):我正在阅读一个名为 的 kafka 主题poc
,该主题每秒从我制作的 python 生产者接收消息,使用 spark 流。这个主题接收类似的消息(免责声明:这是模拟数据):
{"client_id": "8lab1wi9df", "product_id": "7254u", "payment_id": "h656r", "amount": 350.44, "application_date": "2021-07-12", "type": "bank_payment", "id_status": "success", "received_data": "some data"}
我过滤了消息,因此唯一写入名为主题的消息written
是带有 type的消息bank_payment
;written
然后我用一个读得很好的控制台消费者检查这个主题。在它抛出以下错误之前,我让它运行了大约一个小时四十分钟:
21/11/04 19:40:07 ERROR Utils: Aborting task
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition poc-0 could be determined
21/11/04 19:40:07 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 1209, attempt 0, stage 1209.0)
21/11/04 19:40:07 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 1209, attempt 0, stage 1209.0)
21/11/04 19:40:07 ERROR Executor: Exception in task 0.0 in stage 1209.0 (TID 1209)
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition poc-0 could be determined
21/11/04 19:40:07 WARN TaskSetManager: Lost task 0.0 in stage 1209.0 (TID 1209, 192.168.0.16, executor driver): org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition poc-0 could be determined
还有更长的堆栈跟踪...
我的第一个猜测是 Kafka 主题written
失败,因为它没有在 60 秒内收到消息,如消息所示,这似乎不太可能,因为如果发生超时,Kafka 应该等待更多时间。我想到的另一件事是流式查询以某种方式超时,因为它无法读取poc
或写入消息written
开始。
我使用此代码从/向 kafka 读写:
val inputDF = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "poc")
.option("enable.auto.commit", true)
.load()
// some filtering
val query = packedDF
.select(col("value").cast(StringType))
.writeStream
.format("kafka")
.option("checkpointLocation", "myCheckpointLocation")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "written")
.start()
query.awaitTermination()
你们有没有人处理过这个问题?
解决方案
推荐阅读
- html - 如何通过道具将html模板传递给反应组件?
- python - MPEG-2 AAC 逐帧音频解码
- javascript - JQuery - 如何重用来自 JSON 的数据?
- vhdl - 为什么信号不在进程内部更新?
- recursion - 证明动态编程中的记忆化并不总是有帮助的反例
- swift - 从 firebase 获取与用户 uid 相关的用户名以显示评论
- python - 遍历文本文件,查找关键字,创建字典,然后将字典附加到字典列表
- mysql - 如何在 mysql 工作台中找到结果网格
- gradle - Gradle 项目命名约定
- regex - 使用 Perl 的正则表达式查找项目