首页 > 解决方案 > 如何不在 KafkaListener 中提交偏移量

问题描述

我正在使用 KafkaListener 并希望根据消息是否成功处理来控制 Offsets 的提交。为此,我使用下面的代码,但它在 nack(index, sleep) 上引发异常:

    @KafkaListener(topics="topic-name", groupId="group"){
    someMethod(@Header(KafkaHeaders.OFFSET) int offset, Acknowlegement ack, String message){
       try{
           processMessage(message);
           ack.acknowledge();
       }catch(Exception e){
           ack.nack(offset, 1000);
       }
    }

我已将 ackMode 设置为 Manual 并且 Auto Commit 属性设置为 false。消息处理失败时 nack 抛出异常:

      Exception: nack(index, sleep) is not supported by this Acknowledgement.

也欢迎任何其他处理该场景的方式。

标签: apache-kafkaspring-kafka

解决方案


nack(index, sleep)用于批处理侦听器List<String> messages。索引是告诉容器列表中的哪条消息失败了。

对于记录侦听器使用nack(sleep)- 容器已经知道哪个记录失败。

请参阅文档

从 2.3 版开始,Acknowledgement 接口有两个额外的方法nack(long sleep)nack(int index, long sleep). 第一个用于记录侦听器,第二个用于批处理侦听器。为您的侦听器类型调用错误的方法将引发IllegalStateException.

使用记录侦听器,当调用 nack() 时,将提交任何未决的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下一次轮询时重新传递失败的记录和未处理的记录( )。通过设置 sleep 参数,消费者线程可以在重新交付之前暂停。这与在容器配置了 SeekToCurrentErrorHandler 时引发异常的功能类似。

使用批处理侦听器时,您可以指定批处理中发生故障的索引。当调用 nack() 时,将为索引之前的记录提交偏移量,并在分区上执行失败和丢弃记录的搜索,以便在下一次 poll() 时重新传递它们。这是对 SeekToCurrentBatchErrorHandler 的改进,后者只能寻找整个批次进行重新投递。


推荐阅读