首页 > 解决方案 > 无法使用 Kafka JDBC Sink 连接器将数据加载到 Postgres

问题描述

我使用 debezium 源连接器将数据从 RDS postgres 带到了 kafka 主题。主题中的数据如下所示:

{"domain":"domain-new-34B","profile":"2423947a-asf23424","account":"aasdfadf","customer":"gaf23sdf","profileno":"324","user":"234234","updatedat":233463463456,"__deleted":"false"}

我正在使用 Kafka JDBC 接收器连接器将数据发送到 Cloudsql Postgres。我的接收器连接器 .json 文件如下所示:

{
"name":"postgres-sink-connector",
"config":{
    "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max":1,
    "auto.create":true,
    "connection.url":"jdbc:postgresql://host:5432/testdb",
    "connection.user":"user1",
    "connection.password":"user123",
    "topics":"server1.public.topic1",
    "auto.create":"true",
    "auto.evolve":"true",
    "value.converter":"org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"false",
    "key.converter":"org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "transforms": "flatten",
    "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
    "transforms.flatten.delimiter": ".",
    "table.name.format": "${topic}",
    "transforms": "route",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$2_$3",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms": "unwrap",
    "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState"
}

}

发布连接器时出现以下错误:

java.base/java.lang.Thread.run(Thread.java:834)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Sink connector 'postgres-sink-connector' is configured with 'delete.enabled=false' and 'pk.mode=none' and therefore requires records with a non-null Struct value and non-null Struct schema, but found record at (topic='server1.public.topic1',partition=0,offset=0,timestamp=1212312441756) with a HashMap value and null value schema.

我还没有创建目标表,我希望它是自动创建的。

标签: postgresqlapache-kafkaapache-kafka-connect

解决方案


推荐阅读