首页 > 解决方案 > kafka 什么时候会重试处理未确认的消息?

问题描述

我有一个配置了手动 ACK 属性的消费者:

@Bean
    public ConcurrentKafkaListenerContainerFactory<String, MessageAvro> kafkaListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, MessageAvro> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

还有一个使用@KafkaListener 方法的消费者,它做了一些工作,比如:

    @KafkaListener(
            topics = "${tpd.topic-name}",
            containerFactory = "kafkaListenerContainerFactory",
            groupId = "${tpd.group-id}")
    public void messageListener(final ConsumerRecord<String, MessageAvro> msg, @Payload final MessageAvro message, final Acknowledgment ack) {
    if (someCondition) {
        // do something
        ack.acknowledge();
    } else {
       // do not acknoledge the message here in order to retry it later.
    }
}
        

如果条件为“假”并且我们继续进行“其他”部分,我的消费者何时会尝试再次阅读未确认的消息?

如果它不再这样做,我如何告诉我的@KafkaListener 考虑未确认的消息?

标签: spring-bootapache-kafkakafka-consumer-apispring-kafka

解决方案


一旦您提交(或“确认”)一个偏移量,所有先前的偏移量也会在某种意义上被提交,即 ConsumerGroup不会尝试再次读取它。

这意味着:如果您达到“else”条件并且您的作业继续以某种方式运行,它将达到“if”条件并确认所有偏移量都将被提交。

这背后的原因是 Kafka 消费者将报告给下一个要读取的偏移量的代理。为此,Kafka 将该信息存储在称为__consumer_offsets键/值对的内部 Kafka 主题中,其中键:ConsumerGroup,主题名称,分区值:要读取的下一个偏移量

该内部主题是一个压缩主题,这意味着它最终只会存储提到的键的最新值。因此,Kafka 不会跟踪介于两者之间的“未确认”消息。

解决方法

人们通常所做的是将那些“未确认”的消息分叉到另一个主题中,以便稍后可以一起检查和使用它们。这样,您就不会阻止您的实际应用程序使用更多消息,并且您可以单独处理未确认的消息。


推荐阅读