首页 > 解决方案 > 如何使火花流从未处理的偏移量开始?

问题描述

我正在开发一个蒸汽应用程序 POC,我从 kafka 生产者那里获取消息,并在 spark 结构化蒸汽消费者中获取这些主题并将其存储在 delta 表中。我在 S3 中使用option("checkpointLocation", checkPointdir). 我的查询是如何读取此位置以获取最新的偏移量,以防我的流失败并传递给起始偏移量 .option("startingOffsets", readvalue)

我浏览了下面的参考资料,但没有太多线索如何从 s3 读取值,或者我必须编写一个单独的 scala 程序才能从 S3 读取它。 https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.html 我的偏移文件如下所示

v1
{"batchWatermarkMs":0,"batchTimestampMs":1594923737216,"conf":{"spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider","spark.sql.streaming.flatMapGroupsWithState.stateFormatVersion":"2","spark.sql.streaming.multipleWatermarkPolicy":"min","spark.sql.streaming.aggregation.stateFormatVersion":"2","spark.sql.shuffle.partitions":"200"}}
{"json-topic":{"0":41}}

这方面的任何线索都会有所帮助

标签: scalaapache-sparkapache-kafkastreaming

解决方案


推荐阅读