首页 > 解决方案 > 立即更新 Kafka 偏移量

问题描述

我正在开发一个 Spring Boot 应用程序,它对推送到 Kafka 队列的消息做出反应。

版本是 Spring Boot 2.0.5,Finchley.SR1。

Kafka版本是kafka_2.12-1.1.0

我面临的问题是,有时当我重新启动应用程序时,它会重播旧消息。这并不总是发生 - 我发现的唯一模式是它似乎是在几天不活动之后(比如周一早上,周末之后)。

作为开发的一部分,我在白天多次停止和启动该应用程序,并且没有看到相同的问题,只是偶尔出现。它也不与应用程序中的错误相关联,因为所有处理都是干净的。

我已将我的 Kafka 侦听器配置为使用 MANUAL_IMMEDIATE 确认,并在侦听器方法的末尾调用 ack.acknowledge() 。

我的 Spring 属性文件如下所示:

spring:
  kafka:
    bootstrap-servers: kafka:9092
    listener:
      ack-mode: MANUAL_IMMEDIATE
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      group-id: user-mgmt-app

我的 Listener 类定义如下:

@org.springframework.kafka.annotation.KafkaListener(topics = "aggregate-event-topic")
public void receive(ConsumerRecord<?, ?> cr, Acknowledgment ack) {

   ...
   ack.acknowledge();

}

我有一个应用程序实例正在运行,所以它每次都是消费者组的领导者。

我已经使用 Kafka 工具查看了消费者组的偏移量,我注意到的一件事是,当我在确认步骤断点应用程序时,它并没有更新 CURRENT-OFFSET,它似乎只更新了一次消息已处理。

./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group user-mgmt-app --describe

我从其他帖子中的理解是,MANUAL_IMMEDIATE 会在调用 acknowedge() 后立即更新 Kafka 服务器,而不是在批处理结束时。

我的理解不正确吗?如果是这样的话,无论如何都可以获得我想要的功能(例如在每次从分区读取时将批处理大小设置为 1,我猜这可能会影响性能)。如果是这样,我该怎么做(感激地接受任何帮助!)

TIA

标签: javaspringspring-bootapache-kafkaspring-kafka

解决方案


我面临的问题是,有时当我重新启动应用程序时,它会重播旧消息。这并不总是发生 - 我发现的唯一模式是它似乎是在几天不活动之后(比如周一早上,周末之后)。

我猜你没有使用 2.0.0 代理,其中消费者偏移量的默认保留期从 24 小时增加到 7 天。较旧的代理仅在一天后使偏移量到期 - 如果您在周末没有消息,这是典型的问题。

请参阅2.0.0 中的显着变化

KIP-186 将默认偏移保留时间从 1 天增加到 7 天。这使得它不太可能在不经常提交的应用程序中“丢失”偏移量。它还增加了活动的偏移量集,因此可以增加代理的内存使用量。请注意,控制台使用者当前默认启用偏移提交,并且可能是大量偏移的来源,此更改现在将保留 7 天而不是 1 天。您可以通过设置代理配置 offsets.retention 来保留现有行为。分钟到 1440。

我不确定为什么您没有通过命令行工具看到偏移量更新。AckMode.RECORD 将在每条记录后更新偏移量。只要 Spring Kafka 版本 >= 1.3, MANUAL_IMMEDIATE 就会在您调用时更新acknowledge()(Boot 2.0.x 将引入 Spring Kafka 2.0.x)。


推荐阅读