首页 > 解决方案 > kafka 连接器 jdbc-sink 最后出现语法错误

问题描述

我对这个拱门有一个关于 jdbc-sink 的问题。

postgres1 ---> kafka ---> postgres2

生产者工作正常,但消费者有一个错误:

连接_1 | org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "customers" ("id") VALUES (1) ON CONFLICT ("id") DO UPDATE SET 被中止:错误:输入 connect_1 末尾的语法错误 |
位置:77 调用 getNextException 查看批处理中的其他错误。

这是我的 source.json

{
"name": "src-table",
"config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres1_container",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "postgres",
    "database.whitelist": "postgres",
    "database.server.name": "postgres1",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
}

这是我的 jdbc-sink.json

{
    "name": "jdbc-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "customers",
        "connection.url": "jdbc:postgresql://postgres2_container:5432/postgres?user=postgres&password=postgres",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
        "auto.create": "true",
        "insert.mode": "upsert",
        "pk.fields": "id",
        "pk.mode": "record_value"
    }
}

debezium/动物园管理员:0.9

debezium/kafka:0.9

debezium/postgres:9.6

debezium/连接:0.9

PostgreSQL JDBC 驱动程序 42.2.5

卡夫卡连接 JDBC 5.2.1

我试图降级 jdbc 驱动程序和融合 kafka 连接,但仍然有同样的错误

标签: postgresqljdbcapache-kafkadocker-composedebezium

解决方案


解决问题,因为当我在 postgres1 中创建表时,我没有将idPK 值设置为


推荐阅读