首页 > 解决方案 > 读取数据、保持数据 N 秒、写入数据(Kafka、Flink)

问题描述

应用程序从 kafka 主题中读取。每条消息必须是唯一的(忽略重复)将数据保存“N”秒,并作为单独的消息写入不同的 kafka 主题

有没有办法将消息保存'N秒'并写入kafka每条消息必须在它进来的'N'秒后写入同一个主题。

目前,我将数据保存在内存中的 json 结构中,每次收到消息时,我都会遍历我拥有的所有消息并比较时间。

当然,这不是这样做的方法

val some_consumer= new FlinkKafkaConsumer09(data_topic
      , new JSONKeyValueDeserializationSchema(false), properties)
    some_consumer.setStartFromLatest()
    val in_stream = env.addSource(some_consumer)
      .filter(!_.isNull)
      .map(x => processMessage(x))
def process(x: ObjectNode){
 // store message in json if not existing
 // loop through entire set and compare times
 // if after 'N' seconds
   // write to kafka
    kafka_producer.send(new ProducerRecord[String, String](output_topic, the_unique_message))


}

标签: scalaapache-kafkaapache-flink

解决方案


您应该将消息保持在 Flink 状态,以便它们被检查点,并且在失败的情况下将被恢复。

要对流进行重复数据删除,您可以通过使事件唯一的任何属性(即keyBy(x -> x.uniqueId). 然后我会使用 a KeyedProcessFunction,并为 a 中的每个键缓冲第一个事件ValueState<Event>。您可以使用 EventTimeTimer 或 ProcessingTimeTimer 来触发发送事件(以合适的为准)。如果去重的范围是 N 秒,那么可以在发出事件的同时清除状态。


推荐阅读