首页 > 解决方案 > 如何在 Kafka 消费者重试用尽时设置确认

问题描述

我有一个 Kafka 消费者,它重试了 5 次,我正在使用 Spring Kafka 和重试模板。现在,如果所有重试都失败了,那么在这种情况下如何确认工作。另外,如果我已将确认模式设置为手动,那么如何确认这些消息

消费者

@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setRetryTemplate(retryTemplate);
    factory.setRecoveryCallback(context -> {
        log.error("Maximum retry policy has been reached {}", context.getAttribute("record"));
        Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
        ack.acknowledge();
        return null;
    });
    factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
    return factory;
}

卡夫卡监听器

@KafkaListener(topics = "${kafka.topic.json}", containerFactory = "kafkaListenerContainerFactory")
public void recieveSegmentService(String KafkaPayload, Acknowledgment acknowledgment) throws Exception {
    KafkaSegmentTrigger kafkaSegmentTrigger;
    kafkaSegmentTrigger = TransformUtil.fromJson(KafkaPayload, KafkaSegmentTrigger.class);
    log.info("Trigger recieved from segment service {}", kafkaSegmentTrigger);
    processMessage(kafkaSegmentTrigger);
    acknowledgment.acknowledge();
}

标签: spring-bootspring-kafkaspring-retry

解决方案


如果您有一个RecoveryCallback在重试用尽时处理记录,并且您没有使用手动确认,则容器将提交偏移量。

如果您使用手动确认,您可以在恢复回调中提交偏移量。

传递给回调的上下文对象有一个属性RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT(“确认”)。

它也有CONTEXT_CONSUMERCONTEXT_RECORD


推荐阅读