首页 > 解决方案 > Kafka-Elasticsearch 接收器连接器不工作

问题描述

我正在尝试将数据从 Kafka 发送到 Elasticsearch。我检查了我的 Kafka Broker 是否正常工作,因为我可以看到我生成到某个主题的消息被 Kafka Consumer 读取。但是,当我尝试将 Kafka 连接到 Elasticsearch 时,出现以下错误。

命令:

connect-standalone etc/schema-registry/connect-avro-standalone.properties \
etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

错误:

ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
    at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
    at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:83)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
    at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
    at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)
    ... 2 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

我的 Docker 撰写文件:

version: '3'
services:
  zookeeper:
    container_name : zookeeper
    image: zookeeper
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888

  kafka:
    container_name : kafka
    image: bitnami/kafka:1.0.0-r5
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_BROKER_ID: "42"
      KAFKA_ADVERTISED_HOST_NAME: "kafka"
      ALLOW_PLAINTEXT_LISTENER: "yes" 
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092

  elasticsearch:
    container_name : elasticsearch
    image:
      docker.elastic.co/elasticsearch/elasticsearch:7.8.0
    environment:
      - node.name=elasticsearch
      - cluster.name=es-docker-cluster
      - discovery.seed_hosts=elasticsearch
      - bootstrap.memory_lock=false
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - discovery.type=single-node
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - data99:/usr/share/elasticsearch/data
    ports:
      - 9200:9200
  kibana:
    container_name : kibana
    image: docker.elastic.co/kibana/kibana:7.8.0
    # environment:
      # - SERVER_NAME=Local kibana
      # - SERVER_HOST=0.0.0.0
      # - ELASTICSEARCH_URL=elasticsearch:9400
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch

  kafka-connect:
    container_name : kafka-connect
    image: confluentinc/cp-kafka-connect:5.3.1
    ports:
      - 8083:8083
    depends_on:
      - zookeeper
      - kafka
    volumes:
      - $PWD/connect-plugins:/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: docker-kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: docker-kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: docker-kafka-connect-status
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_KEY_CONVERTER-SCHEMAS_ENABLE: "false"
      CONNECT_VALUE_CONVERTER-SCHEMAS_ENABLE: "false"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "ERROR"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_TOPICS: "test-elasticsearch-sink"
      CONNECT_TYPE_NAME: "type.name=kafka-connect"
      CONNECT_PLUGIN_PATH: '/usr/share/java' #'/usr/share/java'
      # Interceptor config
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.1.jar
      CONNECT_KAFKA_HEAP_OPTS: "-Xms256m -Xmx512m"

volumes:
  data99:
    driver: local

我检查了其他一些问题和答案,但无法找到解决此问题的方法。

提前致谢!

标签: elasticsearchapache-kafkaapache-kafka-connect

解决方案


Connect 容器已经启动了 Connect Distributed Server。您应该使用 HTTP 和 JSON 属性来配置 Elastic 连接器,而不是在容器外壳中执行并发出connect-standalone默认使用在容器本身中运行的代理的命令。

同样,默认情况下,Elastic 快速入门文件期望 Elasticsearch 在 Connect 容器中运行


推荐阅读