首页 > 解决方案 > 当我重置 Spring Boot 应用程序时消费者重启

问题描述

我有一个带有数据的 Kafka 主题,称为“topic01”

我想创建一个消费者,每次我启动我的 Spring Boot 2 应用程序时,都从头开始阅读该主题。

我有以下代码,如果我向我添加了一些新的主题,但在第一次开始时,它不会从主题的开头读到我。

@KafkaListener(topics = "topic01")
public void listenTopic01(ConsumerRecord<String, MiDTO> consumerRecord) throws Exception {
    logger.info("KafkaHandler");
    logger.info(consumerRecord.value().toString());
    logger.info(consumerRecord.key().toString());
    latch.countDown();
}

应用程序属性:

spring.kafka.consumer.group-id=XXXXX
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

我应该添加什么配置,以便每次我重新启动我的应用程序时,这个@KafkaListener 从头开始​​读取主题。

标签: spring-bootspring-kafka

解决方案


每次使用唯一(随机)组 ID,或者让您的侦听器类实现ConsumerSeekAware并添加

@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
    consumer.seekToBeginning(partitions);
}

或者

@KafkaListener(topics = "topic01", 
    groupId = "#{T(java.util.UUID).randomUUID().toString()}")

推荐阅读