首页 > 解决方案 > Spring kafka消费者异常-> org.apache.kafka.common.errors.NotCoordinatorException

问题描述

我正在尝试配置一个 kafka 侦听器。下面是我的消费者配置的代码..

 @Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-server}")
    private String bootstrapAddress;
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG,
          "d");
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String>
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

下面是听者---

@KafkaListener(topics = "demo",groupId = "d",containerFactory = "kafkaListenerContainerFactory")
public void consume(@Payload String msg){
    System.out.println(msg);
}

每当我启动项目时,我都无法配置消费者。以下是日志——

    2021-06-23 12:09:43.209  INFO 15540 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.2
2021-06-23 12:09:43.210  INFO 15540 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: da65af02e5856e34
2021-06-23 12:09:43.210  INFO 15540 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1624430383208
2021-06-23 12:09:43.212  INFO 15540 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-d-1, groupId=d] Subscribed to topic(s): demo
2021-06-23 12:09:43.235  INFO 15540 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2021-06-23 12:09:43.242  INFO 15540 --- [           main] c.k.k.KafkatutorialApplication           : Started KafkatutorialApplication in 1.903 seconds (JVM running for 2.578)
2021-06-23 12:09:43.482  INFO 15540 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-d-1, groupId=d] Cluster ID: dQMLeqVTRPKuOayKrjAvbA
2021-06-23 12:09:43.483  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.485  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] (Re-)joining group
2021-06-23 12:09:43.496  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.501  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.501  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.610  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.619  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Rebalance failed.

org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

2021-06-23 12:09:43.620  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] (Re-)joining group
2021-06-23 12:09:43.627  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.630  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.631  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.748  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.750  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Rebalance failed.

org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.

2021-06-23 12:09:43.750  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] (Re-)joining group
2021-06-23 12:09:43.756  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.759  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.759  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.873  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.875  INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-d-1, groupId=d] Rebalance failed.

这是我的 kafka 配置,我正在使用 kafka.topic=demo kafka.bootstrap-server=localhost:9092

kafka 主题以复制因子 1 和分区 1 开始。

请建议是什么问题?

标签: javaspringspring-bootapache-kafkaspring-kafka

解决方案


推荐阅读