postgresql - 无法使用 Kafka JDBC Sink 连接器将数据加载到 Postgres
问题描述
我使用 debezium 源连接器将数据从 RDS postgres 带到了 kafka 主题。主题中的数据如下所示:
{"domain":"domain-new-34B","profile":"2423947a-asf23424","account":"aasdfadf","customer":"gaf23sdf","profileno":"324","user":"234234","updatedat":233463463456,"__deleted":"false"}
我正在使用 Kafka JDBC 接收器连接器将数据发送到 Cloudsql Postgres。我的接收器连接器 .json 文件如下所示:
{
"name":"postgres-sink-connector",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":1,
"auto.create":true,
"connection.url":"jdbc:postgresql://host:5432/testdb",
"connection.user":"user1",
"connection.password":"user123",
"topics":"server1.public.topic1",
"auto.create":"true",
"auto.evolve":"true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": ".",
"table.name.format": "${topic}",
"transforms": "route",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$2_$3",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms": "unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState"
}
}
发布连接器时出现以下错误:
java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'postgres-sink-connector' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='server1.public.topic1',partition=0,offset=0,timestamp=1212312441756) with a HashMap value and null value schema.
我还没有创建目标表,我希望它是自动创建的。
解决方案
推荐阅读
- qt - 交叉编译到树莓派时 JPEG 图像质量下降
- c# - 如何映射另一个表中的列?
- java - 如何从一个div下的不同元素中获取所有文本
- php - 如何在两个 DIV 之间进行嵌套的 For Loop 拆分输出?
- apache-kafka - 在 Ubuntu 的本地设置 kafka-connect 以与 debezium 和 WSO2 流处理器一起使用?
- numpy - 如何使用带有熊猫的随机森林来使用特征重要性?
- java - 如何使用 Spark 并行读取文件夹中具有不同 rowTag 的 xml 文件
- docker - docker 可以使用 http 请求从私有注册表中提取图像吗?
- node.js - 如何在 Node Js API 中添加身份验证?
- c# - 如果我在文本框 Asp.net MVC 上输入 carno,结果不显示