首页 > 解决方案 > 带有 Json Schema 注册表的 Kafka JDBC 接收器连接器

问题描述

我有一个用 postgreSQL 编写的接收器连接器,配置如下。

{
    "name": "connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:postgresql://localhost:5432/postgresuser=user&password=123",
        "topics": "kafka-to-jsonb31",
        "insert.mode": "insert",
        "dialect.name":"PostgreSqlDatabaseDialect",
        "table.name.format":"sample_jsonb",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "true",
        "value.converter.schema.registry.url": "http://localhost:8090/json-schema.txt",
        "auto.create":"false",
        "auto.evolve":"false",
        "pk.mode":"none",
        "pk.fields":"none",
        "transforms": "timestamp",
"transforms.timestamp.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.timestamp.target.type": "Timestamp",
"transforms.timestamp.field":"timestamp",
"transforms.timestamp.format": "yyyy-MM-dd HH:mm:ss"
    }
}

如果我在 Kafka 消息中提供模式,这非常有效,但我希望有一个包含模式定义的单独文件,并且只有实际的有效负载应该发送到连接器。为此我尝试了

 "value.converter.schema.registry.url": "http://localhost:8090/json-schema.txt",

但看起来连接器无法获取架构并抛出异常

org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)
        at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)

以下是我希望连接器接收的消息

    {
"data": "{\"name\":\"name11\",\"age\":26}",
        "timestamp":"2021-08-09 17:39:00"
        
}

以下是架构文件的内容

{
    "schema": {
        "type": "struct",
        "fields": [{
            "field": "data",
            "type": "string",
            "optional": false
        }, {
            "field": "timestamp",
            "type": "string",
            "optional": false
        }]
    }
}

标签: apache-kafkaapache-kafka-connectconfluent-platform

解决方案


value.converter.schema.registry.url应仅包含架构注册表的 URL: "value.converter.schema.registry.url": "http://localhost:8090/"

只要您的架构注册表已经拥有该架构,就可以通过Subject Name Strategy找到该主题。

此外,您必须使用此转换器 :value.converter=io.confluent.connect.json.JsonSchemaConverter而不是org.apache.kafka.connect.json.JsonConverter. 这一个允许您将模式注册表与 json 消息一起使用

JsonConverter需要您在消息中提供架构(您当前面临的问题)。请参阅本文以了解更多详细信息。


推荐阅读