elasticsearch - Kafka-Elasticsearch 接收器连接器不工作
问题描述
我正在尝试将数据从 Kafka 发送到 Elasticsearch。我检查了我的 Kafka Broker 是否正常工作,因为我可以看到我生成到某个主题的消息被 Kafka Consumer 读取。但是,当我尝试将 Kafka 连接到 Elasticsearch 时,出现以下错误。
命令:
connect-standalone etc/schema-registry/connect-avro-standalone.properties \
etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
错误:
ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectStandalone)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:83)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)
... 2 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
我的 Docker 撰写文件:
version: '3'
services:
zookeeper:
container_name : zookeeper
image: zookeeper
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
container_name : kafka
image: bitnami/kafka:1.0.0-r5
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: "42"
KAFKA_ADVERTISED_HOST_NAME: "kafka"
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
elasticsearch:
container_name : elasticsearch
image:
docker.elastic.co/elasticsearch/elasticsearch:7.8.0
environment:
- node.name=elasticsearch
- cluster.name=es-docker-cluster
- discovery.seed_hosts=elasticsearch
- bootstrap.memory_lock=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- discovery.type=single-node
ulimits:
memlock:
soft: -1
hard: -1
volumes:
- data99:/usr/share/elasticsearch/data
ports:
- 9200:9200
kibana:
container_name : kibana
image: docker.elastic.co/kibana/kibana:7.8.0
# environment:
# - SERVER_NAME=Local kibana
# - SERVER_HOST=0.0.0.0
# - ELASTICSEARCH_URL=elasticsearch:9400
ports:
- "5601:5601"
depends_on:
- elasticsearch
kafka-connect:
container_name : kafka-connect
image: confluentinc/cp-kafka-connect:5.3.1
ports:
- 8083:8083
depends_on:
- zookeeper
- kafka
volumes:
- $PWD/connect-plugins:/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: docker-kafka-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: docker-kafka-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: docker-kafka-connect-status
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_KEY_CONVERTER-SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER-SCHEMAS_ENABLE: "false"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_ROOT_LOGLEVEL: "ERROR"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_TOPICS: "test-elasticsearch-sink"
CONNECT_TYPE_NAME: "type.name=kafka-connect"
CONNECT_PLUGIN_PATH: '/usr/share/java' #'/usr/share/java'
# Interceptor config
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.3.1.jar
CONNECT_KAFKA_HEAP_OPTS: "-Xms256m -Xmx512m"
volumes:
data99:
driver: local
我检查了其他一些问题和答案,但无法找到解决此问题的方法。
提前致谢!
解决方案
Connect 容器已经启动了 Connect Distributed Server。您应该使用 HTTP 和 JSON 属性来配置 Elastic 连接器,而不是在容器外壳中执行并发出connect-standalone
默认使用在容器本身中运行的代理的命令。
同样,默认情况下,Elastic 快速入门文件期望 Elasticsearch 在 Connect 容器中运行
推荐阅读
- c - c中同一结构的多个名称
- r - 如何使用 par 和 pdf 绘制许多条形图并且不让它们在 R 中被截断
- javascript - 如何将我的服务器端 PHP 代码添加到此表单向导?
- jquery - 用于 iframe 的 jquery iframe onload
- c++ - Qt 虚拟键盘的 InputPanel 在 Raspberry Pi 0 上为空白
- lucene - Lucene 字符串和数字范围查询
- php - 确保密码至少包含 3 个数组中的一个字符
- python - xlwings 中的格式表
- android - 20+天后由于碎片导致内存不足错误android
- android - Android TV Emulator Play 商店登录