spring-kafka - Spring-kafka - 带有 AckMode Record 和自定义 SeekToCurrent 错误处理程序的消息丢失
问题描述
我试图弄清楚如何处理消息侦听器中发生的**瞬态错误**,同时**消费来自 Kafka 主题的消息**。
我正在使用配置有 **custom errorHandler** 的 **@KafkaListener**。当 **listener throws** 和 Exception 并且 errorHandler 认为它是 *transient** 时,处理程序 **seeks** client 到 **current offset** 以便在下一次轮询时重新获取消息。
侦听器在循环中接收消息,等待解决瞬态问题,如预期的那样,但在某些情况下消息丢失了。
我用的是spring-kafka 2.2.5,请看Kafka的spring boot相关配置
spring:
kafka:
bootstrap-servers: <HOST>:<PORT>
consumer:
auto-offset-reset: earliest
enable-auto-commit: false
group-id: ConsumerGroup
properties:
max.poll.records: 1
listener:
ack-mode: record
concurrency: 1
据我所见,应用程序在以下情况下会丢失消息:
- 消息侦听器错误处理程序未启动/处理的异常将传播到KafkaMessageListenerContainer。容器属性ackOnError的默认值为true,因此容器提交偏移量。如果侦听器重新启动,则消费者组中的任何其他客户端都不再处理消息,因为偏移量已经提交并向前移动;
- 没有异常/错误传播到容器,但侦听器在前面描述的瞬态错误处理循环的中间重新启动。与前一种情况一样,消费者组中的客户端将不会重新使用该消息,因为偏移量已由容器在第一次轮询期间提交。
因此,为了解决第一个问题,将 ackOnError 设置为 false 就足够了。但是对于第二个问题,我开始怀疑 AckMode 记录在这种情况下是否正确,因为在每种情况下都会提交消息。
在这种情况下我应该移动到 MANUAL_IMMEDIATE 还是我错过了这里的重点?
非常感谢,亲切的问候
解决方案
ackOnError
自 2.3 版起默认为 false;它自 2.4 以来已被弃用(支持GenericErrorHandler.isAckAfterHandle()
)并在 master 上被删除(未来的 2.7)。
因为在第一次轮询期间容器已经提交了偏移量。
我不确定你的意思是什么。如果您指的是重新平衡侦听器中的逻辑,如果重新启动侦听器,您将获得一个新的消费者,因此它将position()
是最后提交的偏移量。
从版本 2.3.6 开始,您可以通过将 设置为assignmentCommitOption
来完全禁用该逻辑NEVER
。
2.2.x 已终止(连同 Boot 2.1),将不再发布。上一个 2.2.x 版本是 2.2.14。2.2.5 快2岁了;您应该尽量保持最新。
最新版本是 2.6.3。
推荐阅读
- javascript - Mongoose:TypeError:'mongooseSchemahere' 不是函数
- jquery - jquery如何发布数组
- sql - PostgreSQL 中截至当前日期的每月工作日
- python - Python 中的 Web 抓取错误 - TypeError: 'NoneType' object is not callable
- javascript - Instascan 不抛出错误,但不显示视频预览
- database - 可能分配给两个不同的实体
- javascript - 如何使用打字稿来获取对象数组的长度
- yaml - 从 yaml 生成相同的对象
- python - 如何根据具有相同哈希的元素从集合中检索特定元素?
- python - Tkinter PhotoImage 放置方法透明度(在画布上)