apache-kafka-connect - Confluent InfluxDB 接收器连接器
问题描述
我正在尝试使用 Confluent InfluxDB 接收器连接器将主题中的数据获取到我的 InfluxDB 中。配置如下所示:
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=https://mydb
topics=mytopic
tasks.max=1
当我通过 Kafka Connect UI 创建新连接器时,我得到的只是以下异常:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.kafka.connect.data.Struct
at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:109)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
主题中的值是这样的 json 字符串:{"pid":1,"filename":"test1.csv"}
. 我在这里缺少任何配置吗?
更新:这是我的工人配置:
config.storage.topic=kafka-connect-my-config
rest.port=28082
group.id=kafka-connect-mygroup
plugin.path=/usr/share/java,/connect-plugins
key.converter=org.apache.kafka.connect.storage.StringConverter
offset.storage.topic=kafka-connect-my-offsets
bootstrap.servers={my broker urls}
value.converter=org.apache.kafka.connect.storage.StringConverter
status.storage.topic=kafka-connect-my-status
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
rest.advertised.host.name=kafka-development-kafka-connect-1
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
解决方案
InfluxDB 连接器要求数据中存在模式,因此如果您有 JSON 数据,则需要设置
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=true
但是您的 JSON 需要包含架构,所以不是
{"pid":1,"filename":"test1.csv"}
你需要类似的东西
{
"schema": {
"type": "struct", "optional": false, "version": 1, "fields": [
{ "field": "pid", "type": "string", "optional": true },
{ "field": "filename", "type": "string", "optional": true }
]
},
"payload": {
"pid": 1,
"filename": "test1.csv"
}
}
参考:https ://rmoff.net/2020/01/23/notes-on-getting-data-into-influxdb-from-kafka-with-kafka-connect/
有关如何将架构应用于数据的更多详细信息,请参阅此博客
有关一般转换器的更多详细信息,请参阅本文。
推荐阅读
- django - 如何使用临时默认值填充 Django 外键?
- css - 滚动页面时如何修复顶部 div 和底部?
- azure - 如何将社交身份提供程序按钮添加到 Azure B2C 中的 selfAsserted.html 页面
- javascript - Window.open - 如何打开另一个文件的内容?
- java - JAR 的输入文件
- c++ - C++ 中的事件系统和移动语义
- r - 有没有办法自动化下面的例子
- python - 如何使用纸浆在 python 中不显示任何消息 model.solve()?
- javascript - 处理ajax错误并执行promise链
- laravel - 类型错误:this.axios 未定义