首页 > 解决方案 > Connect Kafka topics to elasticsearch using fast-data-dev using docker

问题描述

I will like to send data from kafka to elasticsearch using fast-data-dev docker image and elasticsearch latest, kibana latest. But I got the following error:

org.apache.kafka.connect.errors.ConnectException: Couldn't start ElasticsearchSinkTask due to connection error: at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:160) at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:144)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:74)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:48)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:304)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:195)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) Caused by: io.searchbox.client.config.exception.CouldNotConnectException: Could not connect to http://127.0.0.1:9200
    at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:73)
    at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:63)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:274)
    at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:153)
    ... 12 more 
Caused by: org.apache.http.conn.HttpHostConnectException: Connect to 127.0.0.1:9200 [/127.0.0.1] failed: Connection refused (Connection refused)
    at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:159)
    at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
    at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
    at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
    at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
    at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
    at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111)
    at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
    at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108)
    at io.searchbox.client.http.JestHttpClient.executeRequest(JestHttpClient.java:136)
    at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:70)
    ... 15 more
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:607)
    at org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75)
    at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
    ... 26 more

Here is my docker-compose file:

services:

#kafka cluster service                                                                    
  kafka-cluster:                                                                                
    image: landoop/fast-data-dev:latest         # image with kafka, zk, landoop, lense.                                                        
    environment:                                                                                   
      ADV_HOST: 127.0.0.1                                                                          
      RUNTESTS: 0                               # We disable running test so the cluster starts faster        
    ports:                                                                                         
      - 2181:2181                               # Zookeeper                                                   
      - 3030:3030                               # Landoop UI                                                  
      - 8081-8083:8081-8083                     # REST Proxy, Schema Registry, Kafka connect ports            
      - 9581-9585:9581-9585                     # JMX Ports                                                   
      - 9092:9092                               # Kafka Broker
    networks:                                   # link kafka-cluster service to the elk network.  
      - elk 


  elasticsearch:
    build:
      context: elasticsearch/
      args:
        ELK_VERSION: $ELK_VERSION
    volumes:
      - type: bind
        source: ./elasticsearch/config/elasticsearch.yml
        target: /usr/share/elasticsearch/config/elasticsearch.yml
        read_only: true
      - type: volume
        source: elasticsearch
        target: /usr/share/elasticsearch/data
    ports:
      - "9200:9200"
      - "9300:9300"
    environment:
      ES_JAVA_OPTS: "-Xmx256m -Xms256m"
      # ELASTIC_PASSWORD: changeme
      # Use single node discovery in order to disable production mode and avoid bootstrap checks.
      # see: https://www.elastic.co/guide/en/elasticsearch/reference/current/bootstrap-checks.html
      discovery.type: single-node    # The elastic node will elect itself master node and will not try to join other cluster.   
    networks:
      - elk
 kibana:
    build:
      context: kibana/
      args:
        ELK_VERSION: $ELK_VERSION
    volumes:
      - type: bind
        source: ./kibana/config/kibana.yml
        target: /usr/share/kibana/config/kibana.yml
        read_only: true
    ports:
      - "5601:5601"
    networks:
      - elk
    depends_on:
      - elasticsearch

networks:
  elk:
    driver: bridge

volumes:
  elasticsearch:

Here is my sink connector configuration to elasticsearch

connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
type.name=kafka-connect
tasks.max=1
topics=elastic
topic.index.map="elastic:index1"
topic.key.ignore=true
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=http://127.0.0.1:9200
key.converter=org.apache.kafka.connect.json.JsonConverter
topic.schema.ignore=true

I linked wanted to link all the containers in the same network so I don't have to deal with connections issues but it seems that I am missing something. The task is not running. Any advice will be helpfull.

标签: dockerelasticsearchapache-kafkadocker-compose

解决方案


After you use network it will not be localhost anymore. You need to use your service name as connection.url . Can you try connection.url=http://elasticsearch:9200 and maybe without http


推荐阅读