java - 立即更新 Kafka 偏移量
问题描述
我正在开发一个 Spring Boot 应用程序,它对推送到 Kafka 队列的消息做出反应。
版本是 Spring Boot 2.0.5,Finchley.SR1。
Kafka版本是kafka_2.12-1.1.0
我面临的问题是,有时当我重新启动应用程序时,它会重播旧消息。这并不总是发生 - 我发现的唯一模式是它似乎是在几天不活动之后(比如周一早上,周末之后)。
作为开发的一部分,我在白天多次停止和启动该应用程序,并且没有看到相同的问题,只是偶尔出现。它也不与应用程序中的错误相关联,因为所有处理都是干净的。
我已将我的 Kafka 侦听器配置为使用 MANUAL_IMMEDIATE 确认,并在侦听器方法的末尾调用 ack.acknowledge() 。
我的 Spring 属性文件如下所示:
spring:
kafka:
bootstrap-servers: kafka:9092
listener:
ack-mode: MANUAL_IMMEDIATE
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
group-id: user-mgmt-app
我的 Listener 类定义如下:
@org.springframework.kafka.annotation.KafkaListener(topics = "aggregate-event-topic")
public void receive(ConsumerRecord<?, ?> cr, Acknowledgment ack) {
...
ack.acknowledge();
}
我有一个应用程序实例正在运行,所以它每次都是消费者组的领导者。
我已经使用 Kafka 工具查看了消费者组的偏移量,我注意到的一件事是,当我在确认步骤断点应用程序时,它并没有更新 CURRENT-OFFSET,它似乎只更新了一次消息已处理。
./kafka-consumer-groups.sh --bootstrap-server kafka:9092 --group user-mgmt-app --describe
我从其他帖子中的理解是,MANUAL_IMMEDIATE 会在调用 acknowedge() 后立即更新 Kafka 服务器,而不是在批处理结束时。
我的理解不正确吗?如果是这样的话,无论如何都可以获得我想要的功能(例如在每次从分区读取时将批处理大小设置为 1,我猜这可能会影响性能)。如果是这样,我该怎么做(感激地接受任何帮助!)
TIA
解决方案
我面临的问题是,有时当我重新启动应用程序时,它会重播旧消息。这并不总是发生 - 我发现的唯一模式是它似乎是在几天不活动之后(比如周一早上,周末之后)。
我猜你没有使用 2.0.0 代理,其中消费者偏移量的默认保留期从 24 小时增加到 7 天。较旧的代理仅在一天后使偏移量到期 - 如果您在周末没有消息,这是典型的问题。
请参阅2.0.0 中的显着变化。
KIP-186 将默认偏移保留时间从 1 天增加到 7 天。这使得它不太可能在不经常提交的应用程序中“丢失”偏移量。它还增加了活动的偏移量集,因此可以增加代理的内存使用量。请注意,控制台使用者当前默认启用偏移提交,并且可能是大量偏移的来源,此更改现在将保留 7 天而不是 1 天。您可以通过设置代理配置 offsets.retention 来保留现有行为。分钟到 1440。
我不确定为什么您没有通过命令行工具看到偏移量更新。AckMode.RECORD 将在每条记录后更新偏移量。只要 Spring Kafka 版本 >= 1.3, MANUAL_IMMEDIATE 就会在您调用时更新acknowledge()
(Boot 2.0.x 将引入 Spring Kafka 2.0.x)。
推荐阅读
- ckeditor - CK 编辑器 - 粘贴来自 MS Word 的隐藏文本
- android - CallScreeningService 中的 onScreenCall() 方法未被调用,我希望此函数获取调用详细信息
- c# - 如何使用 Visual Studio 为 .net 框架 4.7.2 项目创建 SDK 样式项目?
- botframework - 如何在自适应卡片中放置可操作的图标
- excel - Excel 中的可搜索下拉列表
- elasticsearch - 如何将复合聚合与单个存储桶一起使用
- cocoapods - VialerPJSIP PJSIP “Pods-”目标具有传递依赖关系,包括静态链接的二进制文件:
- autohotkey - 如何在自动热键中重新映射 alt+tab?
- escaping - 如何使用 aws route53 list-resource-record-sets cli
- javascript - 添加的每个新项目都显示相同的日期时间