首页 > 解决方案 > Spark Streaming 无法读取 Kafka 偏移量

问题描述

作为概述,我正在使用 Kafka 和 Spark Streaming 运行一些测试(在我的本地):我正在阅读一个名为 的 kafka 主题poc,该主题每秒从我制作的 python 生产者接收消息,使用 spark 流。这个主题接收类似的消息(免责声明:这是模拟数据):

{"client_id": "8lab1wi9df", "product_id": "7254u", "payment_id": "h656r", "amount": 350.44, "application_date": "2021-07-12", "type": "bank_payment", "id_status": "success", "received_data": "some data"}

我过滤了消息,因此唯一写入名为主题的消息written是带有 type的消息bank_paymentwritten然后我用一个读得很好的控制台消费者检查这个主题。在它抛出以下错误之前,我让它运行了大约一个小时四十分钟:

21/11/04 19:40:07 ERROR Utils: Aborting task
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition poc-0 could be determined
21/11/04 19:40:07 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 1209, attempt 0, stage 1209.0)
21/11/04 19:40:07 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 1209, attempt 0, stage 1209.0)
21/11/04 19:40:07 ERROR Executor: Exception in task 0.0 in stage 1209.0 (TID 1209)
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition poc-0 could be determined
21/11/04 19:40:07 WARN TaskSetManager: Lost task 0.0 in stage 1209.0 (TID 1209, 192.168.0.16, executor driver): org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition poc-0 could be determined

还有更长的堆栈跟踪...

我的第一个猜测是 Kafka 主题written失败,因为它没有在 60 秒内收到消息,如消息所示,这似乎不太可能,因为如果发生超时,Kafka 应该等待更多时间。我想到的另一件事是流式查询以某种方式超时,因为它无法读取poc或写入消息written开始。

我使用此代码从/向 kafka 读写:

    val inputDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "poc")
      .option("enable.auto.commit", true)
      .load()
    
    // some filtering

    val query = packedDF
      .select(col("value").cast(StringType))
      .writeStream
      .format("kafka")
      .option("checkpointLocation", "myCheckpointLocation")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "written")
      .start()

    query.awaitTermination()

你们有没有人处理过这个问题?

标签: apache-sparkapache-kafkaspark-structured-streaming

解决方案


推荐阅读