首页 > 解决方案 > 如何将带有大写键的 json 数据下沉到 Postgres 表中?

问题描述

假设我有一个包含 json 数据但没有架构的主题(用户)。

数据示例:

{"id":3151212170,"name":"John Wick"}

为了解决这个问题,我创建了一个流(user_stream)来从主题中获取数据并基本上给它一个模式。

create stream user_stream (id bigint, name string) with (kafka_topic='user', value_format='JSON', key = 'id');

然后使用我创建另一个流的数据:

create stream user_final with (value_format = 'AVRO') as select * from USER_STREAM;

注意:数据现在是带有架构的 Avro 格式,但列现在是大写的。

我正在使用 Kafka 的 JdbcSinkConnector 将数据接收到已经存在的 Postgres 表中。

Postgres 表示例:

create table mytable (id bigint primary key, name text)

接收器连接器配置:

{
    "name": "postgres-sink",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "USER_FINAL",
        "key.converter.schema.registry.url": "http://schema-reg-url:8081",
        "value.converter.schema.registry.url": "http://schema-reg-url:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "connection.url": "jdbc:postgresql://postgres-url:5432/mydbname?user=username&password=password",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "true",
        "table.name.format": "mytable",
        "pk.mode": "none",
        "insert.mode": "insert"
    }
}

问题是流列名是大写的,而 Postgres 表的列名是小写的。即使我在 Postgres 中使用大写列名创建一个表,它也只是转换为小写。

错误:

org.postgresql.util.PSQLException: ERROR: column "ID" of relation "mytable" does not exist

有解决方法吗?我愿意接受建议。

标签: apache-kafkaapache-kafka-connectksqldb

解决方案


即使我在 Postgres 中使用大写列名创建一个表,它也只是转换为小写。

如果您的列名没有放在双引号中,PostgreSQL 会将它们转换为小写格式。

所以,

  • 引号使列名区分大小写
  • 而未加引号的列名总是折叠为小写

在 Kafka Connect 端,您可以使用Kafka Connect Single Message Transofrms (SMT)更改字段名称。

更准确地说,ReplaceField允许您重命名字段。例如,以下转换将分别替换列名COL1COL2tocol1col2

"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames": "COL1:col1,COL2:col2"

推荐阅读