scala - 来自kafka的火花流如何指定轮询事件的截止时间
问题描述
我有火花流应用程序,它运行一天结束并消耗上游应用程序发送的 kafka 事件。目前上游应用程序整天都在推送新数据,而我的消费者最终消费了它。我想根据截止时间限制消耗的事件,比如每天下午 6 点。有没有办法指定截止时间来限制根据截止时间消耗的事件,比如 kafka 事件时间戳或其他东西。下面是消费者代码
KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topicSet, kafkaParams))
解决方案
您可以根据时间戳或时间或任何字段过滤掉处理过程中的事件。例如,假设您的事件是 JSON,并且它有一个名为 hour 的字段,它是事件时间小时值。您可以轻松选择仅在 6 之前创建的事件,如下所示。
directStream.foreachRDD { rdd =>
val eventDfRDD = rdd.filter(record => {
val option = JSON.parseFull(record).get.asInstanceOf[Map[String, String]]
option.get("hour") < 1800
})
}
推荐阅读
- php - Laravel setLocale 返回存储的值,但使用 config.app 默认值
- c# - 使用 Moq 将用户添加到角色
- java - 未捕获的 DOMException:无法在“节点”上执行“removeChild”:要删除的节点不是该节点的子节点
- php - 文件导入到 mySQLi - 不工作
- java - 在多个滚动中的 iframe 内滚动
- windows - Windows 10 Docker 容器客户端无法访问主机上的 SQL
- firebase - 将 Firestore 索引与 web 应用程序中的 slug 相关联
- javascript - 如何删除角度的自动对焦?
- r - plot_summs() 改变绘制回归系数的大小
- html - ./ 和 ../ 目录差异