首页 > 解决方案 > Kafka:在运行 kafka-console-consumer 时,发送 FindCoordinator 请求和 Group coordinator 的无限循环不可用或无效

问题描述

当我尝试从其中一台代理机器上通过 kafka-console-consumer 读取某个主题的信息时,我在 kafka 调试日志中遇到了以下 4 条语句的无限循环。

[2021-07-06 08:15:14,499] DEBUG [Consumer clientId=consumer-test-consumer-group-1-1, groupId=test-consumer-group-1] Sending FindCoordinator request to broker kafka01-broker:9094 (id: 5 rack: us-west-2b) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-07-06 08:15:14,504] DEBUG [Consumer clientId=consumer-test-consumer-group-1-1, groupId=test-consumer-group-1] Received FindCoordinator response ClientResponse(receivedTimeMs=1625559314504, latencyMs=5, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1-1, correlationId=32), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=3, host='kafka02-broker', port=9094)) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-07-06 08:15:14,504] INFO [Consumer clientId=consumer-test-consumer-group-1-1, groupId=test-consumer-group-1] Discovered group coordinator kafka02-broker:9094 (id: 2147483644 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-07-06 08:15:14,504] INFO [Consumer clientId=consumer-test-consumer-group-1-1, groupId=test-consumer-group-1] Group coordinator kafka02-broker:9094 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

[2021-07-06 08:15:14,504] DEBUG [Consumer clientId=consumer-test-consumer-group-1-1, groupId=test-consumer-group-1] Sending FindCoordinator request to broker kafka01-broker:9094 (id: 5 rack: us-west-2b) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-07-06 08:15:14,507] DEBUG [Consumer clientId=consumer-test-consumer-group-1-1, groupId=test-consumer-group-1] Received FindCoordinator response ClientResponse(receivedTimeMs=1625559314504, latencyMs=5, disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=3, clientId=consumer-test-consumer-group-1-1, correlationId=32), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='NONE', nodeId=3, host='kafka02-broker', port=9094)) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-07-06 08:15:14,507] INFO [Consumer clientId=consumer-test-consumer-group-1-1, groupId=test-consumer-group-1] Discovered group coordinator kafka02-broker:9094 (id: 2147483644 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2021-07-06 08:15:14,507] INFO [Consumer clientId=consumer-test-consumer-group-1-1, groupId=test-consumer-group-1] Group coordinator kafka02-broker:9094 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

and so on

查看 kafka 源代码,我可以看到下面的代码返回 true,因此是无限循环

AbstractCoordinator.java

client.isUnavailable(coordinator) in 

 protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
 ...
 } else if (coordinator != null && client.isUnavailable(coordinator)) {
                // we found the coordinator, but the connection has failed, so mark
                // it dead and backoff before retrying discovery
                markCoordinatorUnknown();
                timer.sleep(rebalanceConfig.retryBackoffMs);
 }

卡夫卡版本:2.12-2.5.0

如果我尝试从 kafka 命令 utils 连接到 kafka02-broker:9094,它似乎工作正常,我想知道为什么 client.isUnavailable(coordinator) 会返回 true。

标签: apache-kafkakafka-consumer-apiconsumer

解决方案


推荐阅读