首页 > 解决方案 > 卡夫卡消费者在重启时跳过消息

问题描述

我有一个 Kafka 集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序停止运行时推送到主题的消息。

当应用程序启动时,我可以看到它读取了带有偏移量的消息100,然后将偏移量推101送到__consumer_offsets. 然后,当应用程序关闭时,带有偏移量的消息101, 102 and 103被推送到主题。重新启动应用程序后,它会读取101并将其偏移量设置为104,从而跳过102 and 103

这是我的配置:

config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaCluster);
config.put(ConsumerConfig.GROUP_ID_CONFIG, my-consumer);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);

标签: apache-kafkakafka-consumer-api

解决方案


查看可用信息,您的消费者似乎没有跳过任何消息。

使用 offset 消费消息后100,内部主题__consumer_offsets存储101此消费者的偏移量。偏移量是消费者将在该主题上阅读101下一个偏移量。

重启后,主题中还有 3 条消息,消费者开始处理偏移量101,并且还应该随后处理其他消息。但是,根据您的提交策略,它会向内部主题报告__consumer_offsets下一条要读取的消息有 offset 104。它不会具体说明 101、102 和 103,因为所有消息都是一次轮询的。


推荐阅读