spring-kafka - 在 Spring Boot 中重新启动侦听器后无法读取未提交的偏移量
问题描述
我正在尝试根据某些条件在 Kafka 中提交偏移量。这是我的监听器代码。
@KafkaListener(topics = "test")
public void getTopics(@RequestBody String emp,Acknowledgment acknowledgment) {
Employee model = gson.fromJson(emp, Employee.class);
if(model.getId()%2!=0)
{
System.out.println("Kafka event consumed is: " + emp);
acknowledgment.acknowledge();
}
else {
System.out.println("Model converted value: " + model);
}
}
这是我的 application.prop 文件配置
spring.kafka.listener.ack-mode=manual-immediate
spring.kafka.consumer.auto-offset-reset=earliest
起初,当我启动 springboot 应用程序时,我在主题中传递了一些甚至是员工 ID。然后我在做出相反的条件后重新启动我的程序。if(model.getId()%2==0)
我能够获取我没有先提交的值。
我通过添加spring.kafka.consumer.enable-auto-commit=false
application.properties 重试相同的过程。但是这次消费者不会更早地重试。我认为我应该能够得到我之前得到的东西,在第一种情况下它不应该作为自动工作-commit 应该是错误的提交。感谢您的任何建议。
解决方案
推荐阅读
- json - JSON Schema OneOf 级联
- php - 如何在 Magento 中拥有一个管理控制器并在 phhtml 中使用它来上传文件
- android - 带有 Facebook SDK 的 Android Espresso AppNotIdleException
- xpath - Xquery将元素的所有“内容”(文本()+所有子节点)复制到新元素中
- c# - 调用 Execute 方法在正常代码执行中返回 Null,但在单步执行时返回数据
- javascript - 从 d3 比例和颜色插值返回 RGB 对象而不是字符串
- ios - 我可以在 Swift 5 和 IOS 12 中以编程方式更改 iOS 屏幕壁纸吗
- c++ - 手动链接时 Clang 不生成 profraw 文件
- android - 如何使用 Android 矢量绘图作为活动的背景?
- ffmpeg - 为什么没有 dts pts 写入我的 mp4 容器