首页 > 解决方案 > Kafka 消费者的问题(提供的成员在当前一代中未知)

问题描述

我正在使用带有 Kafka 消费者输入插件的 Telegraf 将消息转发到 InfluxDB。尝试从 Kafka 服务器消费时,Telegraf 日志显示以下内容

Apr 12 11:25:13 algtigtelegraf telegraf[848255]: 2021-04-12T08:25:13Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming telegraf/0: read tcp TELEGRAF_IP:38494->KAFKA_IP:9093: i/o timeout
Apr 12 11:25:28 algtigtelegraf telegraf[848255]: 2021-04-12T08:25:28Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming telegraf/0: kafka server: The provided member is not known in the current generation.
Apr 12 11:25:28 algtigtelegraf telegraf[848255]: 2021-04-12T08:25:28Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming telegraf/0: kafka server: The provided member is not known in the current generation.
Apr 12 11:25:28 algtigtelegraf telegraf[848255]: 2021-04-12T08:25:28Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming telegraf/0: kafka server: The provided member is not known in the current generation.
Apr 12 11:25:28 algtigtelegraf telegraf[848255]: 2021-04-12T08:25:28Z E! [inputs.kafka_consumer] Error in plugin: kafka: error while consuming telegraf/0: kafka server: The provided member is not known in the current generation.

通过查看 Kafka 日志,我可以看到与上述错误相关的以下内容:

[2021-04-12 08:26:42,559] INFO [GroupCoordinator 0]: Preparing to rebalance group telegraf_metrics_consumers in state PreparingRebalance with old generation 238 (__consumer_offsets-6) (reason: removing member Telegraf-bd5ba2ac-e037-4d06-91d3-155d9cc63981 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:26:42,560] INFO [GroupCoordinator 0]: Group telegraf_metrics_consumers with generation 239 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:01,424] INFO [GroupCoordinator 0]: Dynamic Member with unknown member id joins group telegraf_metrics_consumers in Empty state. Created a new member id Telegraf-b1168722-a4f9-42cc-b0c3-10c78e2cf4d1 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:01,424] INFO [GroupCoordinator 0]: Preparing to rebalance group telegraf_metrics_consumers in state PreparingRebalance with old generation 239 (__consumer_offsets-6) (reason: Adding new member Telegraf-b1168722-a4f9-42cc-b0c3-10c78e2cf4d1 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:01,426] INFO [GroupCoordinator 0]: Stabilized group telegraf_metrics_consumers generation 240 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:01,441] INFO [GroupCoordinator 0]: Assignment received from leader for group telegraf_metrics_consumers for generation 240 (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:11,459] INFO [GroupCoordinator 0]: Member Telegraf-b1168722-a4f9-42cc-b0c3-10c78e2cf4d1 in group telegraf_metrics_consumers has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:11,459] INFO [GroupCoordinator 0]: Preparing to rebalance group telegraf_metrics_consumers in state PreparingRebalance with old generation 240 (__consumer_offsets-6) (reason: removing member Telegraf-b1168722-a4f9-42cc-b0c3-10c78e2cf4d1 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:11,459] INFO [GroupCoordinator 0]: Group telegraf_metrics_consumers with generation 241 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)
[2021-04-12 08:27:31,687] INFO [GroupCoordinator 0]: Dynamic Member with unknown member id joins group telegraf_metrics_consumers in Empty state. Created a new member id Telegraf-96ac6751-4f2e-4a67-80ee-1175c30540e4 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)

我对 kafka_consumer 输入插件使用以下电报配置:

[[inputs.kafka_consumer]]
  ## Kafka brokers.
  brokers = ["KAFKA_IP:9093"]

  ## Topics to consume.
  topics = ["telegraf"]

  ## SSL parameters.
  insecure_skip_verify = true

  ## Data format
  data_format = "influx"

当我在网络中运行 Telegraf kafka_consumer 并通过其内部 IP 访问 Kafka 服务器时,它按预期工作。在这种情况下,我试图通过其公共 IP 访问 Kafka 服务器。知道在这种情况下问题可能出在哪里吗?

标签: apache-kafkakafka-consumer-apitelegraftelegraf-inputs-plugintelegraf-plugins

解决方案


推荐阅读