首页 > 解决方案 > Spring Kafka Batch Listener 应用重启

问题描述

我正在测试一个 Spring Kafka 批处理侦听器,批处理确认模式,一次轮询 3 条记录并将这些记录保存到数据库中。当 Spring Boot 应用程序重新启动时,我看到另外 3 条记录正在被消耗和处理。

如果批量较大(500)并且在关闭过程完成之前无法将记录保存到数据库怎么办?我们如何确保这些消息不会丢失或处理这种情况?

2021-09-21 22:37:22,448 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]  com.example.demo.kafka.CassandraConsumer:  processing batch size: 3, starting partition: 0, offset: 2797145
Disconnected from the target VM, address: '127.0.0.1:55506', transport: 'socket'
2021-09-21 22:38:02,388 WARN  [HikariPool-1 housekeeper]  com.zaxxer.hikari.pool.HikariPool$HouseKeeper: HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=48s84ms).
2021-09-21 22:38:02,393 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]  com.example.demo.kafka.CassandraConsumer: batch update -> 39.943579103
2021-09-21 22:38:02,494 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]  com.example.demo.kafka.CassandraConsumer:  processing batch size: 3, starting partition: 0, offset: 2797148
2021-09-21 22:38:02,495 INFO  [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1]  com.example.demo.kafka.CassandraConsumer: batch update -> 7.52304E-4

标签: kafka-consumer-apispring-kafka

解决方案


除非侦听器正常退出,否则不会提交偏移量 - 只要您使用具有默认错误处理程序 ( SeekToCurrentBatchErrorHandler) 的最新受支持版本。


推荐阅读