scala - 读取数据、保持数据 N 秒、写入数据(Kafka、Flink)
问题描述
应用程序从 kafka 主题中读取。每条消息必须是唯一的(忽略重复)将数据保存“N”秒,并作为单独的消息写入不同的 kafka 主题
有没有办法将消息保存'N秒'并写入kafka每条消息必须在它进来的'N'秒后写入同一个主题。
目前,我将数据保存在内存中的 json 结构中,每次收到消息时,我都会遍历我拥有的所有消息并比较时间。
当然,这不是这样做的方法
val some_consumer= new FlinkKafkaConsumer09(data_topic
, new JSONKeyValueDeserializationSchema(false), properties)
some_consumer.setStartFromLatest()
val in_stream = env.addSource(some_consumer)
.filter(!_.isNull)
.map(x => processMessage(x))
def process(x: ObjectNode){
// store message in json if not existing
// loop through entire set and compare times
// if after 'N' seconds
// write to kafka
kafka_producer.send(new ProducerRecord[String, String](output_topic, the_unique_message))
}
解决方案
您应该将消息保持在 Flink 状态,以便它们被检查点,并且在失败的情况下将被恢复。
要对流进行重复数据删除,您可以通过使事件唯一的任何属性(即keyBy(x -> x.uniqueId)
. 然后我会使用 a KeyedProcessFunction
,并为 a 中的每个键缓冲第一个事件ValueState<Event>
。您可以使用 EventTimeTimer 或 ProcessingTimeTimer 来触发发送事件(以合适的为准)。如果去重的范围是 N 秒,那么可以在发出事件的同时清除状态。
推荐阅读
- python - ValueError:roc_curve 不支持多类格式
- javascript - MongoDB Atlas/云更新和删除不起作用
- flutter - 颤动的 webapp 不工作,但 iOS 和 Apk 文件工作
- django - django-allauth signup() 缺少 1 个必需的位置参数:“模型”
- spring-boot - 在部署 Spring Boot 应用程序时,K8s pod 处于 Evicted 状态
- c# - 大整数的 mod 到大整数的幂 c#
- c++ - 如何修复尝试绑定 ':basic_string 的错误
>&' 和 ':basic_string >' 在 cpp 中? - excel - 创建 Excel 文件不再在 AppleScript 中工作
- html - 基本的html!我想在我的页面上添加一个黑框
- javascript - 如何使用单个对象制作正确的默认道具