首页 > 解决方案 > 如何在 Spark SQL 中加入后正确保存 Kafka 偏移检查点以重新启动应用程序

问题描述

我是 Spark 的新手,并且有一个设置,我想使用 Spark 结构化流 2.4 读取两个数据流,每个数据流来自 Kafka 主题。然后我想加入这两个流,可能需要很长的时间。

val df1 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", endpoint)
  .option("subscribe", topic1)
  .option("startingOffsets", <?>) 
  .load()

val df2 = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", endpoint)
  .option("subscribe", topic2)
  .option("startingOffsets", <?>) 
  .load()

val joineddf = df1.join(
  df2,
  expr(
    raw"""
            key == key2 AND
            timestamp1 <= timestamp2 + interval 1 day
        """),
  joinType = "leftOuter")

现在,当我重新启动/升级应用程序时,我想确保 a). 我的startingOffsets 足够早,因此我不会跳过加入任何事件和b)。我尽量减少必须重新加入的事件数量(尽管我可能不得不为 a 牺牲 b)。我想知道,在这种情况下检查点的最佳方法是什么?我知道我可以像这样向连接操作添加检查点:

   val joineddf = df1.join(
  df2,
  expr(
    raw"""
            key == key2 AND
            timestamp1 <= timestamp2 + interval 1 day
        """),
  joinType = "leftOuter")
  .checkpoint("/directory")

但这在启动时提供 Kafka 偏移量对我没有帮助。我也通读了这两个先前的问题[1] [2],但它们似乎只处理查询(即 writestream)上的检查点,而我关心的是检查两个不同流的读取。我不想将“最新”作为起始偏移量传递,因为我想在之前的应用程序运行中已经读取但处于“边缘”等待加入的数据将被跳过。

标签: apache-sparkapache-spark-sql

解决方案


您需要使用检查点writeStream- 它会跟踪用于您的操作的所有源的偏移量,并将它们存储在检查点目录中,因此当您重新启动应用程序时,它将读取所有源的偏移量并从它们继续。您指定的偏移量readStream仅适用于您还没有检查点目录的情况 - 在第一次查询之后,它将填充真实的偏移量,并且不会使用选项中指定的值(直到您删除检查点目录) .

阅读结构化流式处理文档以了解其工作原理。

PScheckpoint您在上一个示例中使用的是另一回事 - 不适用于结构化流。


推荐阅读