首页 > 解决方案 > 在 Spring Boot 中重新启动侦听器后无法读取未提交的偏移量

问题描述

我正在尝试根据某些条件在 Kafka 中提交偏移量。这是我的监听器代码。

@KafkaListener(topics = "test")
public void getTopics(@RequestBody String emp,Acknowledgment acknowledgment) {
    
Employee model = gson.fromJson(emp, Employee.class);
if(model.getId()%2!=0)
{
    System.out.println("Kafka event consumed is: " + emp);
    acknowledgment.acknowledge();
}

else {
    System.out.println("Model converted value: " + model); 
}
}

这是我的 application.prop 文件配置

spring.kafka.listener.ack-mode=manual-immediate
spring.kafka.consumer.auto-offset-reset=earliest

起初,当我启动 springboot 应用程序时,我在主题中传递了一些甚至是员工 ID。然后我在做出相反的条件后重新启动我的程序。if(model.getId()%2==0)我能够获取我没有先提交的值。

我通过添加spring.kafka.consumer.enable-auto-commit=falseapplication.properties 重试相同的过程。但是这次消费者不会更早地重试。我认为我应该能够得到我之前得到的东西,在第一种情况下它不应该作为自动工作-commit 应该是错误的提交。感谢您的任何建议。

标签: spring-kafka

解决方案


推荐阅读