首页 > 解决方案 > Spring Kafka在消费者重试完成后产生消息

问题描述

我有一个 kafka 消费者,它使用来自主题的消息并处理它们。我已经应用了 5 次重试尝试来处理消息并将该消息写入另一个主题以供将来参考,以防即使在 5 次尝试后处理仍然失败。我的代码如下所示:

卡夫卡消费者配置:

   @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaManualAckListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
        factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));
        return factory;
    }

卡夫卡消费者:

    @Value("${kafka.consumer.failed.topic}")
    private String failedTopic;
    
    @KafkaListener(topics = "${kafka.consumer.topic}", groupId = "${kafka.consumer.groupId}", containerFactory = "kafkaManualAckListenerContainerFactory")
    public void processMessage(String kafkaMessage) throws Exception {
        log.info("parsing new kafka message {}", kafkaMessage);
        TransactionDTO transactionDTO = TransformUtil.fromJson(kafkaMessage, TransactionDTO.class);
        try {
            service.parseTransactionDTO(transactionDTO);
        } catch (Exception e) {
            log.error(e.getMessage());
            kafkaTemplate.send(failedTopic, kafkaMessage);
            Thread.sleep(10000);
            throw e;
        }
    }

消费者在 5 次尝试中正确尝试处理,延迟 10 秒,但每次都失败,一条新消息被写入失败的主题。有没有办法,当所有重试尝试都用尽时,消息只写入失败的主题一次,而不是每次失败时都写入?

标签: spring-bootapache-kafkaspring-kafka

解决方案


factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));

在错误处理程序中使用 aDeadLetterPublishingRecoverer而不是。null

它是在 2.2 中引入的;我不确定您使用的是什么版本。如果较旧,您可以升级(当前版本是 2.5.3),或者您可以将发布代码移动到自定义恢复器中。

较新的版本(自 2.3 起)允许在重试尝试之间添加回退延迟。


推荐阅读