首页 > 解决方案 > 如何使用 Kafka Connect 配置 HdfsSinkConnector?

问题描述

我正在尝试设置一个 HdfsSinkConnector。这是我的 worker.properties 配置:

bootstrap.servers=kafkacluster01.corp:9092
group.id=nycd-og-kafkacluster

config.storage.topic=hive_conn_conf
offset.storage.topic=hive_conn_offs
status.storage.topic=hive_conn_stat

key.converter=org.apache.kafka.connect.storage.StringConverter

value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://my-schemaregistry.co:8081

schema.registry.url=http://my-schemaregistry.co:8081

hive.integration=true
hive.metastore.uris=dev-hive-metastore
schema.compatibility=BACKWARD

value.converter.schemas.enable=true
logs.dir = /logs
topics.dir = /topics

plugin.path=/usr/share/java

这是我为设置连接器而调用的帖子请求

curl -X POST localhost:9092/connectors -H "Content-Type: application/json" -d '{
  "name":"hdfs-hive_sink_con_dom16",
  "config":{
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "topics": "dom_topic",
    "hdfs.url": "hdfs://hadoop-sql-dev:10000",
    "flush.size": "3",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url":"http://my-schemaregistry.co:8081"
    }
}'    

该主题dom_topic已经存在(是 Avro),但我从我的工作人员那里收到以下错误:

INFO Couldn't start HdfsSinkConnector: (io.confluent.connect.hdfs.HdfsSinkTask:72)
org.apache.kafka.connect.errors.ConnectException: java.io.IOException: 
Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: 
Protocol message end-group tag did not match expected tag.; 
Host Details : local host is: "319dc5d70884/172.17.0.2"; destination host is: "hadoop-sql-dev":10000;
        at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:202)
        at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:64)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:207)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

我从 hive 得到的 hdfs.url:jdbc:hive2://hadoop-sql-dev:10000

如果我将端口更改为 9092,我会得到

INFO Retrying connect to server: hadoop-sql-dev/xxx.xx.x.xx:9092. Already tried 0 time(s); maxRetries=45 (org.apache.hadoop.ipc.Client:837)

我在 Docker 上运行这一切,我的 Dockerfile 非常简单

#FROM coinsmith/cp-kafka-connect-hdfs
FROM confluentinc/cp-kafka-connect:5.3.1

COPY confluentinc-kafka-connect-hdfs-5.3.1 /usr/share/java/kafka-connect-hdfs
COPY worker.properties worker.properties

# start 
ENTRYPOINT ["connect-distributed", "worker.properties"]

任何帮助,将不胜感激。

标签: hadoophiveapache-kafkahdfsapache-kafka-connect

解决方案


推荐阅读