首页 > 解决方案 > Kafka Connect JDBC 源:必填字段的空值且没有默认值

问题描述

我正在尝试使用 postgresql 设置 Kafka Connect Jdbc 源连接器,但出现以下错误:

org.apache.kafka.connect.errors.DataException: Conversion error: null value for field that is required and has no default value
    at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:556)
    at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:650)
    at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:537)
    at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:290)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

这是配置:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "incrementing.column.name": "id",
  "tasks.max": "1",
  "query": "SELECT * FROM comments",
  "table.whitelist": "comments",
  "mode": "incrementing",
  "key.converter.schemas.enable": "true",
  "topic.prefix": "comments_topic",
  "value.converter.schemas.enable": "true",
  "name": "JdbcSourceConnector",
  "connection.url": "jdbc:postgresql://db:5432/my-db?user=postgres&password=password",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter"
}

知道问题可能是什么吗?

我有时也会收到此错误:

org.apache.kafka.connect.errors.DataException: Found null value for non-optional schema
    at io.confluent.connect.avro.AvroData.validateSchemaValue(AvroData.java:981)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:359)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:532)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:76)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:168)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

标签: apache-kafkaapache-kafka-connect

解决方案


推荐阅读