首页 > 解决方案 > Kafka Streaming 应用程序在与 Kafka 连接后仅读取最新消息

问题描述

我们正在使用 Kafka Streaming 库为 Kafka 主题上的传入消息构建实时通知类型的系统,因此在流式应用程序运行时,它会实时处理主题中的所有传入消息,并在遇到某种类型时发送通知预定义的传入消息。

如果流媒体应用程序关闭并再次启动,我们需要仅处理在流媒体应用程序初始化后到达的最近消息。这是为了避免处理流式应用程序未运行或关闭时未处理的旧记录。默认情况下,流应用程序开始处理自上次提交偏移量以来的旧消息。Kafka Streaming App 中是否有任何设置允许仅处理最新消息?

标签: apache-kafkastreaming

解决方案


KafkaConsumer 的“auto.offset.reset”默认值为“最新”,但您想使用 KafkaStreams,默认为“最早”参考:https ://github.com/apache/kafka/blob/trunk/streams/src/main/ java/org/apache/kafka/streams/StreamsConfig.java#L634

因此,如果 set auto.offset.reset 是“最新的”,它将是您想要的。


推荐阅读