kafka-consumer-api - Spring Kafka Batch Listener 应用重启
问题描述
我正在测试一个 Spring Kafka 批处理侦听器,批处理确认模式,一次轮询 3 条记录并将这些记录保存到数据库中。当 Spring Boot 应用程序重新启动时,我看到另外 3 条记录正在被消耗和处理。
如果批量较大(500)并且在关闭过程完成之前无法将记录保存到数据库怎么办?我们如何确保这些消息不会丢失或处理这种情况?
2021-09-21 22:37:22,448 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: processing batch size: 3, starting partition: 0, offset: 2797145
Disconnected from the target VM, address: '127.0.0.1:55506', transport: 'socket'
2021-09-21 22:38:02,388 WARN [HikariPool-1 housekeeper] com.zaxxer.hikari.pool.HikariPool$HouseKeeper: HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=48s84ms).
2021-09-21 22:38:02,393 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: batch update -> 39.943579103
2021-09-21 22:38:02,494 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: processing batch size: 3, starting partition: 0, offset: 2797148
2021-09-21 22:38:02,495 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.kafka.CassandraConsumer: batch update -> 7.52304E-4
解决方案
除非侦听器正常退出,否则不会提交偏移量 - 只要您使用具有默认错误处理程序 ( SeekToCurrentBatchErrorHandler
) 的最新受支持版本。
推荐阅读
- firebase-cloud-messaging - Are FCM topics name globally unique?
- c# - 以字节形式获取图像并发送到 Facebook Api
- c - C/数字逻辑 - 为什么我的零初始化变量会改变值?
- mysql - 使用 Node.js 开始开发的交钥匙 docker-compose 解决方案?
- google-apps-script - 按钮作为 Google 表格上的链接
- html - 我试图制作两个交互按钮。它们是交互式的,但不是按钮
- html - 提交视图函数后什么都不做
- flutter - 对齐容器中的多个文本
- r - 在 R 中导入 NetCDF 文件时,光栅砖中的经纬度丢失
- reactjs - Gatsby 项目文件结构中的模板文件夹有什么用