首页 > 解决方案 > Spark Structured Streaming-是否可以将偏移量写入两次

问题描述

我正在使用 spark 结构化流来使用来自 kafka 主题的数据并将数据写入另一个 kafka 接收器。

我想存储偏移量两次 - 从主题中读取一次并搅拌偏移量。其次,当将数据写入输出接收器并写入偏移量时,这可以通过提供检查点目录位置来实现,

是否可以写订阅主题时消耗的偏移量。

标签: apache-kafkaoffsetspark-structured-streamingkafka-topicspark-checkpoint

解决方案


您可以使用StreamingQueryListener。您可以通过以下方式将侦听器添加到您的流中

spark.streams.addListener(new StreamingQueryListener() {

  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = { 

    // insert code here to log the offsets in addition to Spark's checkpoint

  }

  override def onQueryProgress(event: QueryProgressEvent): Unit = {}

  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
})

推荐阅读