首页 > 解决方案 > Spring Cloud @StreamListener 消费者未在消费者组中注册 CONSUMER-ID、HOST 和 CLIENT-ID

问题描述

我们有一个 spring 云消费者来读取来自一个 kafka 主题的消息。以下是频道界面

    @Component
    public interface CollectionStreams {


       String INPUT_REPORT = "report-in";
       String OUTPUT_REPORT = "report-out";

       @Input(INPUT_REPORT)
       SubscribableChannel inboundReport();

       @Output(OUTPUT_REPORT)
       MessageChannel outboundReportToJM();
}

我们面临的问题是,在消费者组“报告”中列出时,我们无法按预期看到 CONSUMER-ID、HOST 和 CLIENT-ID。

[root@innolx131112 templates]# kubectl -n tmo-ccm exec kafka-test-client -- /usr/bin/kafka-consumer-groups --bootstrap-server kafka:9092 --describe -group report
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'report' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
report          3          2               3               1               -               -               -
report          1          4               4               0               -               -               -
report          2          1               1               0               -               -               -
report          4          2               2               0               -               -               -
report          0          2               2               0               -               -               -

顺便说一句,我们在 Kubernetes 中运行我们的应用程序以及 Kafka。

由于这个问题,我们无法将我们的应用程序的多个 POD 作为所有 POD

以下是频道界面

@Component
public interface CollectionStreams {


   String INPUT_REPORT = "report-in";
   String OUTPUT_REPORT = "report-out";

   @Input(INPUT_REPORT)
   SubscribableChannel inboundReport();

   @Output(OUTPUT_REPORT)
   MessageChannel outboundReportToJM();
}

我们已经定义了如下方法来从主题中读取消息。

@StreamListener(CollectionStreams.INPUT_REPORT)
    //public void handleMessage(@Payload MessageT message) {
    public void handleMessage(@Payload MessageT message, @Headers MessageHeaders msg) {

以下是配置yaml

**

cloud:
        stream:
          kafka:
            binder:
              brokers: kafka
              autoCreateTopics: false
            bindings:
              report-in:
                consumer:
                  autoCommitOffset: false
                  autoCommit: false
                  auto-offset-reset: earliest
                  autoCommitOnError: false
                  resetOffsets: false
                  autoRebalanceEnabled: false
                  ackEachRecord: false
          bindings:
            report-in:
              destination: report
              contentType: application/json
              group: report
              consumer:
                concurrency: 5
                partitioned: true
            report-out:
              destination: jobmanager
              contentType: application/json
              group: jobmanager
              producer:
                autoAddPartitions: true

**


我们还有另一个消费者,我们没有为其设置任何与 kafka 相关的消费者道具,令人惊讶的是,这些消费者正在正确地注册自己。

配置:

cloud:
    stream:
      kafka:
        binder:
          autoCreateTopics: false
          brokers: kafka
      bindings:
        parse-in:
          destination: parser
          contentType: application/json
          group: parser
          consumer:
            concurrency: 3
            partitioned: true
        parse-out:
          destination: jobmanager
          contentType: application/json
          group: jobmanager
          producer:
            partitionKeyExpression: headers['contentType']
            autoAddPartitions: true

并描述命令输出如下

[root@innolx131112 shyama]# kubectl -n tmo-ccm exec kafka-test-client -- /usr/bin/kafka-consumer-groups --bootstrap-server kafka:9092 --describe -group parser
Note: This will not show information about old Zookeeper-based consumers.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
parser          0          14              14              0               consumer-3-cb99e45e-21b3-4efb-ac7b-4f642a9486be /192.168.0.26   consumer-3
parser          1          13              13              0               consumer-3-cb99e45e-21b3-4efb-ac7b-4f642a9486be /192.168.0.26   consumer-3
parser          2          15              15              0               consumer-4-bec1e4af-d771-47fe-83ad-3440f3a6d4bd /192.168.0.26   consumer-4
parser          3          15              15              0               consumer-4-bec1e4af-d771-47fe-83ad-3440f3a6d4bd /192.168.0.26   consumer-4
parser          4          12              12              0               consumer-5-b9ac7e36-58cf-40cb-b37d-a0fa092a0d56 /192.168.0.26   consumer-5

是不是因为我们为第一个消费者(报告消费者)提供了kafka相关的道具,所以无法注册???

标签: apache-kafkaspring-kafka

解决方案


Consumer group 'report' has no active members.

这仅仅意味着当你输入命令时你的应用程序没有运行。

该信息是暂时的,在应用停止时不会保留。

下次可能会将分区分配给不同的实例。

编辑

每当我们使用同一组实例化更多消费者时,他们正在处理已由旧消费者处理的相同旧消费者

好吧,更仔细地查看您的配置,这正是您所要求的......

autoRebalanceEnabled: false

... 表示 Kafka 不会使用组管理,分区将由 Spring Cloud 流分配。

autoCommitOffset: false

意味着 Spring 不会提交任何偏移量,这是您的应用程序的责任。如果你不提交偏移量,你会得到你观察到的行为。


推荐阅读