apache-nifi - 连接 Confluent Kafka 和 InfluxDB 时出现 NullPointerException
问题描述
我正在尝试使用 Confluent InfluxDB Sink Connector将来自 kafka 主题的数据获取到我的 InfluxDB 中。
首先,我使用 nifi 从日志文件将数据传输到 kafka 主题,效果很好。kafka 主题获取数据,如下所示:
{
"topic": "testDB5",
"key": null,
"value": {
"timestamp": "2019-03-20 01:24:29,461",
"measurement": "INFO",
"thread": "NiFi Web Server-795",
"class": "org.apache.nifi.web.filter.RequestLogger",
"message": "Attempting request for (anonymous)
},
"partition": 0,
"offset": 0
}
然后,我通过 Kafka Connect UI 创建 InfluxDB 接收器连接器,我得到以下异常:
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:140)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more
但是,如果我通过使用手动将数据输入到另一个主题 testDB1
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic testDB1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"measurement","type":"string"},{"name":"timestamp","type":"string"}]}'
它有效,我的 influxDB 可以获取数据。
这是连接配置:
connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=http://myurl
tasks.max=1
topics=testDB5
连接主题 testDB1 的配置除了主题名称相同。
nifi有什么问题吗?但它可以很好地将数据传输到主题。
解决方案
当您将 Avro 与 Kafka Connect 一起使用时,Avro 反序列化器期望数据已使用Avro 序列化器进行序列化。这就是kafak-avro-console-producer
用途,这就是为什么当你使用它时你的管道可以工作。
本文为 Avro 和 Schema Registry 提供了很好的背景知识。另请参阅Kafka Connect Deep Dive – Converters and Serialization Explained。
我不熟悉 Nifi,但查看文档似乎AvroRecordSetWriter可以选择使用Confluent Schema Registry。猜测您还想设置Schema Write Strategy
为Confluent Schema Registry Reference
.
一旦您可以使用主题中的数据,您kafka-avro-console-consumer
就知道它已正确序列化并且可以与您的 Kafka Connect 接收器一起使用。
推荐阅读
- python - 从 python 脚本调用 matlab 总是显示 matlab 信息,即使没有显示标志
- javascript - 如何在一定时间 HTML / CSS 后淡入和淡出一个 div
- php - SQLSTATE[HY000]:一般错误:1 个表帖子没有名为 * 的列
- code-formatting - 更漂亮:语法错误:只有 void 和外部元素可以自关闭
- xpages - 列数未知的 Xpages 动态表
- flutter - 我可以溢出 Container 中的小部件以使其像剪辑一样吗?
- computational-geometry - 使用 CGAL 的凸壳平面的可能错误方程
- laravel - VPS 上的 Voyager 管理菜单为空
- javascript - 在 useEffect React hook 中在不同时间实现多个 Firestore 调用
- c - 做..虽然循环不会停止等待 uinput