apache-kafka - 卡夫卡消费者在重启时跳过消息
问题描述
我有一个 Kafka 集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序停止运行时推送到主题的消息。
当应用程序启动时,我可以看到它读取了带有偏移量的消息100
,然后将偏移量推101
送到__consumer_offsets
. 然后,当应用程序关闭时,带有偏移量的消息101, 102 and 103
被推送到主题。重新启动应用程序后,它会读取101
并将其偏移量设置为104
,从而跳过102 and 103
。
这是我的配置:
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
config.put(ConsumerConfig.GROUP_ID_CONFIG, my-consumer);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
解决方案
查看可用信息,您的消费者似乎没有跳过任何消息。
使用 offset 消费消息后100
,内部主题__consumer_offsets
存储101
此消费者的偏移量。偏移量是消费者将在该主题上阅读101
的下一个偏移量。
重启后,主题中还有 3 条消息,消费者开始处理偏移量101
,并且还应该随后处理其他消息。但是,根据您的提交策略,它会向内部主题报告__consumer_offsets
下一条要读取的消息有 offset 104
。它不会具体说明 101、102 和 103,因为所有消息都是一次轮询的。
推荐阅读
- css - 即使设置为填充可用空间,弹性框中的按钮也有很小的边距
- spring-kafka - 卡夫卡恰好一次消息测试与“消费-转换-生产”集成测试
- python - 做减法和加法的Python代码?
- spring - Spring Cloud Data Flow:使用过滤器作为处理器
- scala - dotty / scala3 将映射的元组类型取消映射到其组成类型
- java - 什么是 window.addFlags(Integer.MIN_VALUE)
- html - 尝试使用 css 解决长期存在的 Chrome 浏览器问题,即在打印时短暂显示页面方向选项然后将其隐藏
- javascript - Axios.delete 从数据库中删除数据时出错,“错误:请求中止”
- laravel - 跨数据库连接中断
- c - React 中的 C 套接字和 socket.io/Websockets 是否兼容?