首页 > 解决方案 > Spring-kafka - 带有 AckMode Record 和自定义 SeekToCurrent 错误处理程序的消息丢失

问题描述


我试图弄清楚如何处理消息侦听器中发生的**瞬态错误**,同时**消费来自 Kafka 主题的消息**。
我正在使用配置有 **custom errorHandler** 的 **@KafkaListener**。当 **listener throws** 和 Exception 并且 errorHandler 认为它是 *​​transient** 时,处理程序 **seeks** client 到 **current offset** 以便在下一次轮询时重新获取消息。

侦听器在循环中接收消息,等待解决瞬态问题,如预期的那样,但在某些情况下消息丢失了

我用的是spring-kafka 2.2.5,请看Kafka的spring boot相关配置

spring:
  kafka:
    bootstrap-servers: <HOST>:<PORT>
    consumer:
      auto-offset-reset: earliest
      enable-auto-commit: false
      group-id: ConsumerGroup
      properties:
        max.poll.records: 1
    listener:
      ack-mode: record
      concurrency: 1

据我所见,应用程序在以下情况下会丢失消息:

因此,为了解决第一个问题,将 ackOnError 设置为 false 就足够了。但是对于第二个问题,我开始怀疑 AckMode 记录在这种情况下是否正确,因为在每种情况下都会提交消息。

在这种情况下我应该移动到 MANUAL_IMMEDIATE 还是我错过了这里的重点?

非常感谢,亲切的问候

标签: spring-kafka

解决方案


ackOnError自 2.3 版起默认为 false;它自 2.4 以来已被弃用(支持GenericErrorHandler.isAckAfterHandle())并在 master 上被删除(未来的 2.7)。

因为在第一次轮询期间容器已经提交了偏移量。

我不确定你的意思是什么。如果您指的是重新平衡侦听器中的逻辑,如果重新启动侦听器,您将获得一个新的消费者,因此它将position()是最后提交的偏移量。

从版本 2.3.6 开始,您可以通过将 设置为assignmentCommitOption来完全禁用该逻辑NEVER

2.2.x 已终止(连同 Boot 2.1),将不再发布。上一个 2.2.x 版本是 2.2.14。2.2.5 快2岁了;您应该尽量保持最新。

最新版本是 2.6.3。


推荐阅读