jdbc - Kafka-Connect JDBC Sink 在 upsert 期间报告 null id
问题描述
我是 Kafka / Kafka Connect 的新手,我遇到了融合 JDBC 连接器的问题。目前我正在使用 Confluent Community docker compose。
我可以成功地创建一个从 mysql DB 读取到 kafka 的源。
curl -X POST \
-H "Content-Type: application/json" \
--data '{ "name": "college_mysql_source", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": 1, "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "mode": "timestamp+incrementing", "timestamp.column.name": "updated_on", "topic.prefix": "college_mysql_", "poll.interval.ms": 1000, "table.whitelist": "college" } }' \
http://localhost:8083/connectors
数据按预期进入 Kafka,每一列都在 Avro 中正确表示。如果我通过 CLI 创建消费者,我可以看到数据是正确的。
{
"id":112525,
"pim_id":{"long":78806},
"college_name":{"string":"Western University of Health Sciences"},
...
}
如果我创建一个简单的 JDBC 接收器将数据放入另一个 mysql 数据库,一切都很好:
curl -X POST -H "Content-Type: application/json" \
--data '{"name": "weighted_average_mysql_sink_college", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"college_mysql_college", "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "auto.create": "true", "insert.mode": "insert"}}' \
http://localhost:8083/connectors
我们正确地创建了一个表,并且所有字段(包括 id)都正确填充了新记录。但是,如果我改为创建一个使用插入模式 upsert 的接收器,我开始出现错误。
curl -X POST -H "Content-Type: application/json" \
--data '{"name": "weighted_average_mysql_sink_college", "config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max":"1", "topics":"college_mysql_college", "connection.url": "jdbc:mysql://host.docker.internal:3306/...", "auto.create": "true", "insert.mode": "upsert", "pk.mode": "record_key", "pk.fields": "id"}}' \
http://localhost:8083/connectors
这确实正确地创建了表并正确地建立id
了主键,到目前为止一切都很好,但是现在每当它从主题中读取时,我们都会得到一个错误:
java.sql.BatchUpdateException: Column 'id' cannot be null
这就是我卡住的地方。主题中的数据正确地有一个 ID 字段,如果我没有将该列声明为 PK,则该 ID 字段用于 ID 列。我尝试自己定义表,而不是允许接收器创建表,我认为表创建可能有一些奇怪的问题,但似乎并非如此,我得到了完全相同的错误。对此的任何建议或方向将不胜感激,我希望解决方案很简单,我只是遗漏了一些对那些有更多经验的人来说显而易见的东西,可以向我指出。
谢谢!
解决方案
您需要设置“pk.mode”:“record_value”</p>
推荐阅读
- php - 如何从 ACF 日期选择器月份获取前 3 个字符?
- javascript - 如果arraylist中有重复值,如何删除重复项和导致javascript中重复的第一项
- javascript - javascript和node中的移动自动化框架设计问题
- javascript - cordova 捕获音频,并获取文件路径
- node.js - [NodeJS]:在路由之后重定向>在重定向之前调用的函数>我们可以在函数调用之后重定向吗?
- bash - 无法通过 shell 脚本运行 mvn 目标
- ios - iOS 双卡,蜂窝数据的默认 sim 卡
- mysql - 比较两个 MySQL 表并在第二个表中选择不同的条目
- xamarin - Xamarin UI 文本 - 如何在 UITextView 中利用属性字符串(超链接)
- java - 从 Fuseki 获取 prefixMap