首页 > 解决方案 > 连接 Confluent Kafka 和 InfluxDB 时出现 NullPointerException

问题描述

我正在尝试使用 Confluent InfluxDB Sink Connector将来自 kafka 主题的数据获取到我的 InfluxDB 中。

首先,我使用 nifi 从日志文件将数据传输到 kafka 主题,效果很好。kafka 主题获取数据,如下所示:

  {
    "topic": "testDB5",
    "key": null,
    "value": {
      "timestamp": "2019-03-20 01:24:29,461",
      "measurement": "INFO",
      "thread": "NiFi Web Server-795",
      "class": "org.apache.nifi.web.filter.RequestLogger",
      "message": "Attempting request for (anonymous) 
    },
    "partition": 0,
    "offset": 0
  }

然后,我通过 Kafka Connect UI 创建 InfluxDB 接收器连接器,我得到以下异常:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at io.confluent.influxdb.InfluxDBSinkTask.put(InfluxDBSinkTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    ... 10 more

但是,如果我通过使用手动将数据输入到另一个主题 testDB1

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic testDB1 --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"measurement","type":"string"},{"name":"timestamp","type":"string"}]}'

它有效,我的 influxDB 可以获取数据。

这是连接配置:

connector.class=io.confluent.influxdb.InfluxDBSinkConnector
influxdb.url=http://myurl
tasks.max=1
topics=testDB5

连接主题 testDB1 的配置除了主题名称相同。

nifi有什么问题吗?但它可以很好地将数据传输到主题。

标签: apache-nifiapache-kafka-connectconfluent-platform

解决方案


当您将 Avro 与 Kafka Connect 一起使用时,Avro 反序列化器期望数据已使用Avro 序列化器进行序列化。这就是kafak-avro-console-producer用途,这就是为什么当你使用它时你的管道可以工作。

本文为 Avro 和 Schema Registry 提供了很好的背景知识。另请参阅Kafka Connect Deep Dive – Converters and Serialization Explained

我不熟悉 Nifi,但查看文档似乎AvroRecordSetWriter可以选择使用Confluent Schema Registry。猜测您还想设置Schema Write StrategyConfluent Schema Registry Reference.

一旦您可以使用主题中的数据,您kafka-avro-console-consumer就知道它已正确序列化并且可以与您的 Kafka Connect 接收器一起使用。


推荐阅读