apache-spark - 在火花结构化流中手动提交检查点目录中的偏移量
问题描述
我正在编写一个从 Kafka 读取的火花流应用程序。我的 kafka 每天至少有 100 万个事件。我必须对收到的每个事件进行大量计算,并且需要检查点正在处理的每个事件。这样万一发生任何故障,它就不会再次处理任何事件,我会从上次失败的事件开始
此外,由于我的 kafka 已经拥有数十亿的价值,所以它不会在检查点提交,直到第一批完成并且会从头开始重新开始,因为检查点目录中没有提交。
需要某种方式来检查我处理的每个事件。
df
.writeStream
.foreachBatch {
(batchDF: Dataset[CoreDBVersion], batchId: Long) => {
batchDF.collect.foreach {
implicit value=> {
//do all the processing and get required metadata
}
// //as I have processed a event here need to checkpoint this event with offset on checkpoint location
}
}
}
}
.trigger(Trigger.ProcessingTime("1 second"))
.option("checkpointLocation", "path to checkpoint dir")
.start().awaitTermination()
解决方案
推荐阅读
- android - 为什么我需要将我的源代码保留在 Proguard 规则中以防止崩溃?
- python - 使用 numpy 或 pandas 从元组列表中为二元组创建频率矩阵
- r - 堆积面积图 Y 轴不反映实际数据点:带有可重现的示例
- scala - 应该返回相同的值 3 个连续调用
- list - 如何在 Python3 中检查一个空列表
- r - 在 R 中使用管道运算符重命名变量
- c# - 下载文件时防止 UI 冻结
- javascript - React Checkbox Component返回唯一键道具错误
- javascript - 在wixsite的javascript中循环遍历数组
- java - Java Spring,如何检索记录并使用新的唯一键发布重复项?