spring-boot - Spring Kafka在消费者重试完成后产生消息
问题描述
我有一个 kafka 消费者,它使用来自主题的消息并处理它们。我已经应用了 5 次重试尝试来处理消息并将该消息写入另一个主题以供将来参考,以防即使在 5 次尝试后处理仍然失败。我的代码如下所示:
卡夫卡消费者配置:
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Object, Object>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));
return factory;
}
卡夫卡消费者:
@Value("${kafka.consumer.failed.topic}")
private String failedTopic;
@KafkaListener(topics = "${kafka.consumer.topic}", groupId = "${kafka.consumer.groupId}", containerFactory = "kafkaManualAckListenerContainerFactory")
public void processMessage(String kafkaMessage) throws Exception {
log.info("parsing new kafka message {}", kafkaMessage);
TransactionDTO transactionDTO = TransformUtil.fromJson(kafkaMessage, TransactionDTO.class);
try {
service.parseTransactionDTO(transactionDTO);
} catch (Exception e) {
log.error(e.getMessage());
kafkaTemplate.send(failedTopic, kafkaMessage);
Thread.sleep(10000);
throw e;
}
}
消费者在 5 次尝试中正确尝试处理,延迟 10 秒,但每次都失败,一条新消息被写入失败的主题。有没有办法,当所有重试尝试都用尽时,消息只写入失败的主题一次,而不是每次失败时都写入?
解决方案
factory.setErrorHandler(new SeekToCurrentErrorHandler(null, 5));
在错误处理程序中使用 aDeadLetterPublishingRecoverer
而不是。null
它是在 2.2 中引入的;我不确定您使用的是什么版本。如果较旧,您可以升级(当前版本是 2.5.3),或者您可以将发布代码移动到自定义恢复器中。
较新的版本(自 2.3 起)允许在重试尝试之间添加回退延迟。