java - 卡夫卡给出:“在实际进入消费者组之前,组成员需要有一个有效的成员ID”
问题描述
我正在使用 Kafka 来使用 Java 中的消息。我想通过在本地机器上多次启动同一个应用程序来进行测试。当我启动时,我第一次能够开始使用来自该主题的消息。当我启动第二个时,我得到:
Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
并且不会从该主题收到任何消息。如果我尝试启动更多它们,我会遇到同样的问题。
我用于 Kafka 的配置是
spring:
kafka:
bootstrap-servers: kafka:9092
consumer:
auto-offset-reset: earliest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.use.type.headers: false
listener:
missing-topics-fatal: false
我有两个话题
@Configuration
public class KafkaTopics {
@Bean("alertsTopic")
public NewTopic alertsTopic() {
return TopicBuilder.name("XXX.alerts")
.compact()
.build();
}
@Bean("serversTopic")
public NewTopic serversTopic() {
return TopicBuilder.name("XXX.servers")
.compact()
.build();
}
}
以及不同类文件中的两个侦听器。
@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=java.lang.String",
"spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.ServerInfo"
})
public void registerServer(
@Payload(required = false) ServerInfo serverInfo
)
@KafkaListener(topics = ALERTS_KAFKA_TOPIC,
id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafkaKey",
"spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafka"
})
public void processAlert(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) AlertOnKafkaKey key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
@Header(KafkaHeaders.OFFSET) long offset,
@Payload(required = false) AlertOnKafka alert)
解决方案
根据我的分析。这是正常行为,您可以更改日志级别以将其排除。
这样做的原因是,如果服务器检测到客户端可以支持member.id
,它将将该错误返回给客户端。这在KIP-394中有说明。
然后,客户端将使用生成的成员 ID 重新连接回服务器。
推荐阅读
- excel - 发送多封电子邮件:运行时错误“440”:对象不支持此方法
- firefox-addon - Firefox 扩展调试器显示“此页面没有来源”
- c# - 如何通过分配具有相似字段的其他对象来创建对象
- python - 返回类实例时的 Numpydoc 样式约定
- android-fragments - 如何在 Java 项目中使用 NavigationExtensions.kt?
- c - 是否有受 setlocale() 影响的函数的完整列表?
- swift - 如何旋转具有固定锚点的 SCNNode?
- scala - 在 spark scala 中通过 cuid 聚合和求和向量稀疏
- python - 将 2 个列表与 ref 与元素的位置进行比较?
- ruby-on-rails - Rails 路由不会取消订阅控制器操作