首页 > 解决方案 > 在火花结构化流中手动提交检查点目录中的偏移量

问题描述

我正在编写一个从 Kafka 读取的火花流应用程序。我的 kafka 每天至少有 100 万个事件。我必须对收到的每个事件进行大量计算,并且需要检查点正在处理的每个事件。这样万一发生任何故障,它就不会再次处理任何事件,我会从上次失败的事件开始

此外,由于我的 kafka 已经拥有数十亿的价值,所以它不会在检查点提交,直到第一批完成并且会从头开始重新开始,因为检查点目录中没有提交。

需要某种方式来检查我处理的每个事件。

df
    .writeStream
    .foreachBatch {
      (batchDF: Dataset[CoreDBVersion], batchId: Long) => {
        batchDF.collect.foreach {
          implicit value=> {
            //do all the processing and get required metadata
            }
//           //as I have processed a event here need to checkpoint this event with offset on checkpoint location
   
          }
        }
      }
    }
    .trigger(Trigger.ProcessingTime("1 second"))
    .option("checkpointLocation", "path to checkpoint dir")
    .start().awaitTermination()

标签: apache-sparkapache-kafkaapache-spark-sqlspark-streaming

解决方案


推荐阅读