首页 > 解决方案 > 如何在spring kafka Listener中找出是否是重试消息

问题描述

我们正在尝试在 spring boot kafka 侦听器中解决边缘情况:

@KafkaListener(topics="topicName", groupId="consumerGoupId")
public void consumeNotification(@Payload Datarequest request,
                          @Header(KafkaHeaders.OFFSET) Long offset,
                          @Header(KafkaHeaders.RECEIVED_PARTITION_ID)Integer partitionId) 
{
    processRequest(request);
}

现在由于某种原因,当 processRequest 正在进行时,我的 spring boot 应用程序会关闭,偏移量没有被提交,当它启动时,侦听器会再次接收相同的消息进行处理。

我的用例是:有没有办法知道侦听器在重试过程中收到了这条消息?

因为由于应用程序崩溃导致进程请求在两者之间停止,如果相同的消息再次被侦听器拾取,我必须编写一些逻辑processRequest以从它停止的位置开始处理。

标签: javaspring-bootapache-kafkakafka-consumer-apispring-kafka

解决方案


正确的; 唯一的保证是至少一次交货;无法知道消息是否已重新传递(重新启动后 - 我们可以判断它是否是同一应用程序实例中的重新传递 - 请参阅https://docs.spring.io/spring-kafka/docs/current/reference /html/#delivery-header)。

由于您无法在所有情况下都知道,因此您必须使您的侦听器具有幂等性 - 例如,通过将主题/分区/偏移量与数据一起存储,以便您可以检查它是否已被处理。


推荐阅读