首页 > 解决方案 > 从头开始阅读 Kafka 主题

问题描述

我有一个 spring boot 应用程序,它不断读取来自特定主题的所有消息。我的问题是我有一个问题,当应用程序重新启动时,它需要再次读取主题中的所有消息。我认为这两个选项结合起来可以做到,但它不起作用:

这是我的方法;

 @StreamListener(myTopic)
 public void handle(@Payload Input input) {
    /** Do other stuff not related **/
 }

这是我的 application.yaml

spring:
  cloud.stream:
    bindings:
      myTopic:
        destination: TOPIC
        content-type: application/**avro
        group: group-topic
        concurrency: 1
        resetOffsets: true
        startOffset: earliest

标签: springspring-bootapache-kafkayamlkafka-topic

解决方案


您需要将groupapplication.yaml 中的 更改为新的唯一组名称(请参见下面的示例,其中我将使用者组设置为new-consumer-group-id

spring:
  cloud.stream:
    bindings:
      myTopic:
        destination: TOPIC
        content-type: application/**avro
        group: new-consumer-group-id
        concurrency: 1
        resetOffsets: true
        startOffset: earliest

如果继续使用同一个 ConsumerGroup,配置resetOffsetstartingOffset不会产生任何影响。

作为替代方案,您可以使用命令行工具重置每个消费者组的偏移量kafka-consumer-groups.sh。一个模板如下所示:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
 --execute --reset-offsets \
 --group groupA\
 --topic topicC \
 --partition 0 \
 --to-offset <insert number>

推荐阅读