hadoop - 如何使用 Kafka Connect 配置 HdfsSinkConnector?
问题描述
我正在尝试设置一个 HdfsSinkConnector。这是我的 worker.properties 配置:
bootstrap.servers=kafkacluster01.corp:9092
group.id=nycd-og-kafkacluster
config.storage.topic=hive_conn_conf
offset.storage.topic=hive_conn_offs
status.storage.topic=hive_conn_stat
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://my-schemaregistry.co:8081
schema.registry.url=http://my-schemaregistry.co:8081
hive.integration=true
hive.metastore.uris=dev-hive-metastore
schema.compatibility=BACKWARD
value.converter.schemas.enable=true
logs.dir = /logs
topics.dir = /topics
plugin.path=/usr/share/java
这是我为设置连接器而调用的帖子请求
curl -X POST localhost:9092/connectors -H "Content-Type: application/json" -d '{
"name":"hdfs-hive_sink_con_dom16",
"config":{
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"topics": "dom_topic",
"hdfs.url": "hdfs://hadoop-sql-dev:10000",
"flush.size": "3",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://my-schemaregistry.co:8081"
}
}'
该主题dom_topic
已经存在(是 Avro),但我从我的工作人员那里收到以下错误:
INFO Couldn't start HdfsSinkConnector: (io.confluent.connect.hdfs.HdfsSinkTask:72)
org.apache.kafka.connect.errors.ConnectException: java.io.IOException:
Failed on local exception: com.google.protobuf.InvalidProtocolBufferException:
Protocol message end-group tag did not match expected tag.;
Host Details : local host is: "319dc5d70884/172.17.0.2"; destination host is: "hadoop-sql-dev":10000;
at io.confluent.connect.hdfs.DataWriter.<init>(DataWriter.java:202)
at io.confluent.connect.hdfs.HdfsSinkTask.start(HdfsSinkTask.java:64)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:207)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
我从 hive 得到的 hdfs.url:jdbc:hive2://hadoop-sql-dev:10000
如果我将端口更改为 9092,我会得到
INFO Retrying connect to server: hadoop-sql-dev/xxx.xx.x.xx:9092. Already tried 0 time(s); maxRetries=45 (org.apache.hadoop.ipc.Client:837)
我在 Docker 上运行这一切,我的 Dockerfile 非常简单
#FROM coinsmith/cp-kafka-connect-hdfs
FROM confluentinc/cp-kafka-connect:5.3.1
COPY confluentinc-kafka-connect-hdfs-5.3.1 /usr/share/java/kafka-connect-hdfs
COPY worker.properties worker.properties
# start
ENTRYPOINT ["connect-distributed", "worker.properties"]
任何帮助,将不胜感激。
解决方案
推荐阅读
- javascript - 设置 Object = [ ] 时返回不渲染,设置 Object = {} 时有效
- python - QFileDialog.getOpenFileName 将按钮文本从“打开”更改为“删除”
- javascript - 为什么 axios 使用 npm run prod 会抛出错误,而不是 npm run dev/watch?
- ios - 如何使用 Swift Codable 将复杂的 Json 打印到 CollectionView?
- regex - 如何在 lua 正则表达式中匹配此模式,以便它可以捕获字符串中的内容
- javascript - 使用 JavaScript 的基尔霍夫电流定律
- laravel - 试图通过一对多关系在laravel中获取非对象的属性“部门”
- python - 将多个数组与 numpy 数组相交的最佳方法是什么?
- riscv - RISC-V 中断,设置 MTIMECMP
- objective-c - TCP 套接字连接打印空白