首页 > 解决方案 > 使用 Spark 每小时使用一个 Kafka 主题

问题描述

我想批量使用 Kafka 主题,我想每小时读取 Kafka 主题并读取最新的每小时数据。

val readStream = existingSparkSession
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", hostAddress)
  .option("subscribe", "kafka.raw")
  .load()

但这总是读取前 20 个数据行,并且这些行从一开始就开始,所以这永远不会选择最新的数据行。

如何使用 scala 和 spark 每小时读取最新行?

标签: apache-sparkapache-kafkaspark-streamingspark-structured-streaming

解决方案


如果您在 Batch 模式下阅读 Kafka 消息,您需要注意记录哪些数据是新的,哪些不是您自己的。请记住,Spark 不会将任何消息提交回 Kafka,因此每次重新启动批处理作业时,它都会从头开始读取(或基于批处理查询的startingOffsets默认设置earliest

对于您希望每小时运行一次作业并且只处理前一小时到达 Kafka 的新数据的场景,您可以使用 writeStream 触发器选项Trigger.Once进行流式查询。

Databricks有一篇不错的博客Trigger.Once很好地解释了为什么流式查询应该优于批处理查询。

要点是:

“当您运行执行增量更新的批处理作业时,您通常必须弄清楚哪些数据是新的,应该处理什么,不应该处理什么。结构化流已经为您完成了这一切。”

确保您还在 writeStream 中设置了选项“checkpointLocation”。最后,您可以拥有一个简单的 cron 作业,该作业每小时提交一次流式作业。


推荐阅读