首页 > 解决方案 > PostgreSQL JDBC 接收器引发错误 null(Array) 类型没有到 SQL 数据库列类型的映射

问题描述

尝试使用 Kafka JDBC 接收器复制我的数据库时遇到问题。当我将服务器运行到具有 Array 数据类型的表时,它会出现此错误

...
Caused by: org.apache.kafka.connect.errors.ConnectException: null (ARRAY) type doesn't have a mapping to the SQL database column type
...

我想保留相同的数组条件,不想像我对 SQL Server 所做的那样将其转换为字符串(因为 SQL Server 不允许数组数据类型)。

这是我的连接配置:

{"name" :"pgsink_'$topic_name'",
    "config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
            "tasks.max":"1",
            "topics":"'$table'",
            "connection.url":"jdbc:postgresql://",
            "connection.user":"",
            "connection.password":"",
            "transforms":"unwrap",
            "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones": "false",
            "delete.handling.mode":"drop",
            "auto.create":"true",
            "auto.evolve":"true",
            "insert.mode":"upsert",
            "pk.fields":" '$pk'",
            "pk.mode":"record_key",
            "delete.enabled":"true",
            "destination.table.format":"public.'$table'",
            "connection.attempts":"60",
            "connection.backoff.ms":"100000"

}}

我的 Kafka 源来自 Debezium,因为我想保留相同的数据类型,所以我没有将 SMT 放入我的源中。这是源配置:

{
"name":"pg_prod",
    "config":{
        "connector.class":"io.debezium.connector.postgresql.PostgresConnector",
    "plugin.name":"wal2json_streaming",
        "database.hostname":"",
        "database.port":"",
        "database.user":"",
        "database.password":"",
        "database.dbname":"",
        "database.server.name":"",
    "database.history.kafka.bootstrap.servers": "",
    "database.history.kafka.topic": "",
        "transforms":"unwrap,reroute",
    "table.whitelist":"public.table",
    "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.delete.handling.mode": "drop",
    "transforms.unwrap.drop.tombstones": "false",
        "decimal.handling.mode":"double",
        "time.precision.mode":"connect",
    "transforms.reroute.type":"org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.reroute.regex":"postgres.public.(.*)",
    "transforms.reroute.replacement":"$1",
    "errors.tolerance": "all",
        "errors.log.enable":true,
        "errors.log.include.messages":true,
    "kafkaPartition": "0",
    "snapshot.delay.ms":"1000",
    "schema.refresh.mode":"columns_diff_exclude_unchanged_toast"
    }
}

标签: jdbcapache-kafkaapache-kafka-connectdebezium

解决方案


推荐阅读