apache-spark - 如何在 Spark SQL 中加入后正确保存 Kafka 偏移检查点以重新启动应用程序
问题描述
我是 Spark 的新手,并且有一个设置,我想使用 Spark 结构化流 2.4 读取两个数据流,每个数据流来自 Kafka 主题。然后我想加入这两个流,可能需要很长的时间。
val df1 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", endpoint)
.option("subscribe", topic1)
.option("startingOffsets", <?>)
.load()
val df2 = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", endpoint)
.option("subscribe", topic2)
.option("startingOffsets", <?>)
.load()
val joineddf = df1.join(
df2,
expr(
raw"""
key == key2 AND
timestamp1 <= timestamp2 + interval 1 day
"""),
joinType = "leftOuter")
现在,当我重新启动/升级应用程序时,我想确保 a). 我的startingOffsets 足够早,因此我不会跳过加入任何事件和b)。我尽量减少必须重新加入的事件数量(尽管我可能不得不为 a 牺牲 b)。我想知道,在这种情况下检查点的最佳方法是什么?我知道我可以像这样向连接操作添加检查点:
val joineddf = df1.join(
df2,
expr(
raw"""
key == key2 AND
timestamp1 <= timestamp2 + interval 1 day
"""),
joinType = "leftOuter")
.checkpoint("/directory")
但这在启动时提供 Kafka 偏移量对我没有帮助。我也通读了这两个先前的问题[1] [2],但它们似乎只处理查询(即 writestream)上的检查点,而我关心的是检查两个不同流的读取。我不想将“最新”作为起始偏移量传递,因为我想在之前的应用程序运行中已经读取但处于“边缘”等待加入的数据将被跳过。
解决方案
您需要使用检查点writeStream
- 它会跟踪用于您的操作的所有源的偏移量,并将它们存储在检查点目录中,因此当您重新启动应用程序时,它将读取所有源的偏移量并从它们继续。您指定的偏移量readStream
仅适用于您还没有检查点目录的情况 - 在第一次查询之后,它将填充真实的偏移量,并且不会使用选项中指定的值(直到您删除检查点目录) .
阅读结构化流式处理文档以了解其工作原理。
PScheckpoint
您在上一个示例中使用的是另一回事 - 不适用于结构化流。
推荐阅读
- javascript - Mathjax 配置乳胶到中心
- go - 控制 Apache Beam 数据流管道中的并行性
- python - Microsoft Graph API:将 MSAL Python 守护程序应用程序限制为单个用户访问
- python - 为什么 Selenium 无法单击按钮列表中的按钮?
- javascript - 使用侧边栏向 Canvas 上的居中矩形添加一点填充
- javascript - typeorm 中的 find 函数返回带有 __underscores__ 的字段
- javascript - 搜索过滤html选择选项时jquery功能不起作用
- javascript - 如何将新行添加到反应表
- python - 下面的crontab不运行,但是可以在终端运行
- c++ - Mac 上的 Qt:app.exec() 阻止终端输入,如何重写这个最小示例?