spring-boot - 当我重置 Spring Boot 应用程序时消费者重启
问题描述
我有一个带有数据的 Kafka 主题,称为“topic01”
我想创建一个消费者,每次我启动我的 Spring Boot 2 应用程序时,都从头开始阅读该主题。
我有以下代码,如果我向我添加了一些新的主题,但在第一次开始时,它不会从主题的开头读到我。
@KafkaListener(topics = "topic01")
public void listenTopic01(ConsumerRecord<String, MiDTO> consumerRecord) throws Exception {
logger.info("KafkaHandler");
logger.info(consumerRecord.value().toString());
logger.info(consumerRecord.key().toString());
latch.countDown();
}
应用程序属性:
spring.kafka.consumer.group-id=XXXXX
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
我应该添加什么配置,以便每次我重新启动我的应用程序时,这个@KafkaListener 从头开始读取主题。
解决方案
每次使用唯一(随机)组 ID,或者让您的侦听器类实现ConsumerSeekAware
并添加
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
或者
@KafkaListener(topics = "topic01",
groupId = "#{T(java.util.UUID).randomUUID().toString()}")
推荐阅读
- python - 我的 if 语句有问题;没有输出或错误
- javascript - 表示问题类型错误:无法读取未定义的属性“方法”
- python - PuLP - 有没有办法从 dict 访问变量名,而不是从 .variables() 解析字符串版本?
- java - android studio和公共类/属性的问题
- c# - 我如何知道 Excel 应用程序属性已更改(例如:UseSystemSeparator)或已选择自定义选项卡?
- ruby-on-rails - 如何在 Google Cloud Build 中构建 Docker 映像并在后续构建步骤中使用?
- c - 如何在检查变量的函数中使用 printf?
- c# - IEnumerable<> vs List<> 作为参数
- fortran - Valgrind (Helgrind) 误报?
- android - Android:在另一个不发射的情况下收集 Kotlin Flow