apache-spark - 使用 Spark 每小时使用一个 Kafka 主题
问题描述
我想批量使用 Kafka 主题,我想每小时读取 Kafka 主题并读取最新的每小时数据。
val readStream = existingSparkSession
.read
.format("kafka")
.option("kafka.bootstrap.servers", hostAddress)
.option("subscribe", "kafka.raw")
.load()
但这总是读取前 20 个数据行,并且这些行从一开始就开始,所以这永远不会选择最新的数据行。
如何使用 scala 和 spark 每小时读取最新行?
解决方案
如果您在 Batch 模式下阅读 Kafka 消息,您需要注意记录哪些数据是新的,哪些不是您自己的。请记住,Spark 不会将任何消息提交回 Kafka,因此每次重新启动批处理作业时,它都会从头开始读取(或基于批处理查询的startingOffsets
默认设置earliest
。
对于您希望每小时运行一次作业并且只处理前一小时到达 Kafka 的新数据的场景,您可以使用 writeStream 触发器选项Trigger.Once
进行流式查询。
Databricks有一篇不错的博客Trigger.Once
很好地解释了为什么流式查询应该优于批处理查询。
要点是:
“当您运行执行增量更新的批处理作业时,您通常必须弄清楚哪些数据是新的,应该处理什么,不应该处理什么。结构化流已经为您完成了这一切。”
确保您还在 writeStream 中设置了选项“checkpointLocation”。最后,您可以拥有一个简单的 cron 作业,该作业每小时提交一次流式作业。
推荐阅读
- reactjs - 如何修复 React useEffect 时间?
- mysql - mysql中的字符串操作
- java - VSCode 是否可以选择为 java 类“创建测试”?
- php - Elastic Beanstalk 上的 PHP 编写器
- c++ - 一个简单的 3 文件项目中的循环依赖
- python - IOError:[Errno 25] 设备的 ioctl 不合适
- html - 不同的渲染 VS IISExpress 与生产 IIS
- java - 有没有办法为不同的客户构建一个具有相同基础但具有模块化的项目,这些功能将在运行时发现?
- javascript - 我可以构建一个子集合吗?(火库)
- jquery - 如何使用 jquery 日历控件显示图标?