java - Spring kafka消费者异常-> org.apache.kafka.common.errors.NotCoordinatorException
问题描述
我正在尝试配置一个 kafka 侦听器。下面是我的消费者配置的代码..
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-server}")
private String bootstrapAddress;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
"d");
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
下面是听者---
@KafkaListener(topics = "demo",groupId = "d",containerFactory = "kafkaListenerContainerFactory")
public void consume(@Payload String msg){
System.out.println(msg);
}
每当我启动项目时,我都无法配置消费者。以下是日志——
2021-06-23 12:09:43.209 INFO 15540 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.2
2021-06-23 12:09:43.210 INFO 15540 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: da65af02e5856e34
2021-06-23 12:09:43.210 INFO 15540 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1624430383208
2021-06-23 12:09:43.212 INFO 15540 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-d-1, groupId=d] Subscribed to topic(s): demo
2021-06-23 12:09:43.235 INFO 15540 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2021-06-23 12:09:43.242 INFO 15540 --- [ main] c.k.k.KafkatutorialApplication : Started KafkatutorialApplication in 1.903 seconds (JVM running for 2.578)
2021-06-23 12:09:43.482 INFO 15540 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-d-1, groupId=d] Cluster ID: dQMLeqVTRPKuOayKrjAvbA
2021-06-23 12:09:43.483 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.485 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] (Re-)joining group
2021-06-23 12:09:43.496 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.501 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.501 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.610 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.619 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Rebalance failed.
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
2021-06-23 12:09:43.620 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] (Re-)joining group
2021-06-23 12:09:43.627 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.630 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.631 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.748 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.750 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Rebalance failed.
org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
2021-06-23 12:09:43.750 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] (Re-)joining group
2021-06-23 12:09:43.756 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.759 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.759 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2021-06-23 12:09:43.873 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2021-06-23 12:09:43.875 INFO 15540 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-d-1, groupId=d] Rebalance failed.
这是我的 kafka 配置,我正在使用 kafka.topic=demo kafka.bootstrap-server=localhost:9092
kafka 主题以复制因子 1 和分区 1 开始。
请建议是什么问题?
解决方案
推荐阅读
- sql - 在 Django 项目中存储原始 SQL 的位置
- build - 使用 catkin 在 ROS 中构建错误,与记录器“trollius”相关
- google-apps-script - 在另一个电子表格 Apps 脚本中设置值
- html - 如何在 BEM 中使用伪类?
- html - 如何禁用字符串的 React XSS 保护
- asp.net-core - 缺少装配
- sql - SQL:将 rank() 与聚合函数一起使用
- java - 在 JAVA 中运行测试 java.lang.RuntimeException: Stub
- javascript - 在同一网格上填充不同纵横比的图像
- java - 创建变量时是否考虑数据类型的大小?