首页 > 解决方案 > Kafka-Connect JDBC Sink 在 upsert 期间报告 null id

问题描述

我是 Kafka / Kafka Connect 的新手,我遇到了融合 JDBC 连接器的问题。目前我正在使用 Confluent Community docker compose。

我可以成功地创建一个从 mysql DB 读取到 kafka 的源。

curl -X POST \
-H "Content-Type: application/json" \
--data '{ "name": "college_mysql_source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "mode": "timestamp+incrementing", "timestamp.column.name": "updated_on", "topic.prefix": "college_mysql_", "poll.interval.ms": 1000, "table.whitelist": "college" } }' \
http://localhost:8083/connectors

数据按预期进入 Kafka,每一列都在 Avro 中正确表示。如果我通过 CLI 创建消费者,我可以看到数据是正确的。

{
    "id":112525,
    "pim_id":{"long":78806},
    "college_name":{"string":"Western University of Health Sciences"},
    ...
}

如果我创建一个简单的 JDBC 接收器将数据放入另一个 mysql 数据库,一切都很好:

curl -X POST -H "Content-Type: application/json" \
  --data '{"name": "weighted_average_mysql_sink_college", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"college_mysql_college", "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "auto.create": "true", "insert.mode": "insert"}}' \
  http://localhost:8083/connectors

我们正确地创建了一个表,并且所有字段(包括 id)都正确填充了新记录。但是,如果我改为创建一个使用插入模式 upsert 的接收器,我开始出现错误。

curl -X POST -H "Content-Type: application/json" \
  --data '{"name": "weighted_average_mysql_sink_college", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"college_mysql_college", "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "auto.create": "true", "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "id"}}' \
  http://localhost:8083/connectors

这确实正确地创建了表并正确地建立id了主键,到目前为止一切都很好,但是现在每当它从主题中读取时,我们都会得到一个错误:

java.sql.BatchUpdateException: Column 'id' cannot be null

这就是我卡住的地方。主题中的数据正确地有一个 ID 字段,如果我没有将该列声明为 PK,则该 ID 字段用于 ID 列。我尝试自己定义表,而不是允许接收器创建表,我认为表创建可能有一些奇怪的问题,但似乎并非如此,我得到了完全相同的错误。对此的任何建议或方向将不胜感激,我希望解决方案很简单,我只是遗漏了一些对那些有更多经验的人来说显而易见的东西,可以向我指出。

谢谢!

标签: jdbcapache-kafkaapache-kafka-connect

解决方案


您需要设置“pk.mode”:“record_value”</p>


推荐阅读