首页 > 解决方案 > Kafka JDBCSinkConnector Schema 异常:带有 schemas.enable 的 JsonConverter 需要“schema”和“payload”

问题描述

我正在尝试使用 JDBCSinkConnector 将数据从 Kafka 主题传输到 Postgres。在所有操作(例如创建主题、创建流、使用配置创建接收器连接器并通过 python 将数据生成到主题中)之后 - 连接日志返回以下结果:

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration

这是 JSON Schema (sch.json) 的代码:

{
    "schema":{
        "type": "struct",
        "fields": [
         {
        "type":"int32",
        "optional": false,
        "field": "id"
        },
        {
        "type": "string",
        "optional": false,
        "field":"url"
        ],
        "optional":false,
        "name": "test_data"
    },
    "payload":{
        "id": 12,
        "url":"some_url"
       
    }
}

这是 kafka-connect 的代码:

curl -X PUT http://localhost:8083/connectors/sink-jdbc-postgre-01/config \
     -H "Content-Type: application/json" -d '{
    "connector.class"                    : "io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.url"                     : "jdbc:postgresql://postgres:5432/",
    "topics"                             : "test_topic06",
    "key.converter"                      : "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable"       : "true",
    "value.converter"                    : "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable"     : "true",  
    "connection.user"                    : "postgres",
    "connection.password"                : "*******",
    "auto.create"                        : true,
    "auto.evolve"                        : true,
    "insert.mode"                        : "insert",
    "pk.mode"                            : "record_key",
    "pk.fields"                          : "MESSAGE_KEY"
}'

这是用于向 Kafka 生成数据的 python 代码:

from kafka import KafkaProducer
import json 
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
with open("../data/sch.json", 'r') as file:
    read = file.read()
    for i in range(1):
        producer.send("test_topic06", value=read)
producer.close()

然后我试图改变 "key.converter.schemas.enable""value.converter.schemas.enable"假,但它在日志中的结果都是一样的。

完整日志:

2021-04-01 09:20:41,342] INFO MonitoringInterceptorConfig values: 
connect            |    confluent.monitoring.interceptor.publishMs = 15000
connect            |    confluent.monitoring.interceptor.topic = _confluent-monitoring
connect            |  (io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig)
connect            | [2021-04-01 09:20:41,344] INFO ProducerConfig values: 
connect            |    acks = -1
connect            |    batch.size = 16384
connect            |    bootstrap.servers = [broker:29092]
connect            |    buffer.memory = 33554432
connect            |    client.dns.lookup = default
connect            |    client.id = confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0
connect            |    compression.type = lz4
connect            |    connections.max.idle.ms = 540000
connect            |    delivery.timeout.ms = 120000
connect            |    enable.idempotence = false
connect            |    interceptor.classes = []
connect            |    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
connect            |    linger.ms = 500
connect            |    max.block.ms = 60000
connect            |    max.in.flight.requests.per.connection = 1
connect            |    max.request.size = 10485760
connect            |    metadata.max.age.ms = 300000
connect            |    metadata.max.idle.ms = 300000
connect            |    metric.reporters = []
connect            |    metrics.num.samples = 2
connect            |    metrics.recording.level = INFO
connect            |    metrics.sample.window.ms = 30000
connect            |    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
connect            |    receive.buffer.bytes = 32768
connect            |    reconnect.backoff.max.ms = 1000
connect            |    reconnect.backoff.ms = 50
connect            |    request.timeout.ms = 30000
connect            |    retries = 10
connect            |    retry.backoff.ms = 500
connect            |    sasl.client.callback.handler.class = null
connect            |    sasl.jaas.config = null
connect            |    sasl.kerberos.kinit.cmd = /usr/bin/kinit
connect            |    sasl.kerberos.min.time.before.relogin = 60000
connect            |    sasl.kerberos.service.name = null
connect            |    sasl.kerberos.ticket.renew.jitter = 0.05
connect            |    sasl.kerberos.ticket.renew.window.factor = 0.8
connect            |    sasl.login.callback.handler.class = null
connect            |    sasl.login.class = null
connect            |    sasl.login.refresh.buffer.seconds = 300
connect            |    sasl.login.refresh.min.period.seconds = 60
connect            |    sasl.login.refresh.window.factor = 0.8
connect            |    sasl.login.refresh.window.jitter = 0.05
connect            |    sasl.mechanism = GSSAPI
connect            |    security.protocol = PLAINTEXT
connect            |    security.providers = null
connect            |    send.buffer.bytes = 131072
connect            |    ssl.cipher.suites = null
connect            |    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
connect            |    ssl.endpoint.identification.algorithm = https
connect            |    ssl.key.password = null
connect            |    ssl.keymanager.algorithm = SunX509
connect            |    ssl.keystore.location = null
connect            |    ssl.keystore.password = null
connect            |    ssl.keystore.type = JKS
connect            |    ssl.protocol = TLS
connect            |    ssl.provider = null
connect            |    ssl.secure.random.implementation = null
connect            |    ssl.trustmanager.algorithm = PKIX
connect            |    ssl.truststore.location = null
connect            |    ssl.truststore.password = null
connect            |    ssl.truststore.type = JKS
connect            |    transaction.timeout.ms = 60000
connect            |    transactional.id = null
connect            |    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
connect            |  (org.apache.kafka.clients.producer.ProducerConfig)
connect            | [2021-04-01 09:20:41,349] INFO Kafka version: 5.5.0-ce (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-04-01 09:20:41,349] INFO Kafka commitId: 6068e5d52c5e294e (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-04-01 09:20:41,349] INFO Kafka startTimeMs: 1617268841349 (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-04-01 09:20:41,349] INFO interceptor=confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0 created for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-04-01 09:20:41,361] ERROR WorkerSinkTask{id=sink-jdbc-postgre-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
connect            | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:492)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect            |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            |    at java.lang.Thread.run(Thread.java:748)
connect            | Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
connect            |    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
connect            |    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:492)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
connect            |    ... 13 more
connect            | [2021-04-01 09:20:41,363] ERROR WorkerSinkTask{id=sink-jdbc-postgre-01-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect            | [2021-04-01 09:20:41,364] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
connect            | [2021-04-01 09:20:41,366] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0] Cluster ID: K4nfs8sOSWCoI2_jEFzZ1Q (org.apache.kafka.clients.Metadata)
connect            | [2021-04-01 09:20:41,370] INFO [Consumer clientId=connector-consumer-sink-jdbc-postgre-01-0, groupId=connect-sink-jdbc-postgre-01] Revoke previously assigned partitions test_topic06-2, test_topic06-0, test_topic06-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect            | [2021-04-01 09:20:41,370] INFO [Consumer clientId=connector-consumer-sink-jdbc-postgre-01-0, groupId=connect-sink-jdbc-postgre-01] Member connector-consumer-sink-jdbc-postgre-01-0-a6013ad5-a778-4372-a9ab-a0c77119150b sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | [2021-04-01 09:20:41,379] INFO Publish thread interrupted for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-04-01 09:20:41,396] INFO Publishing Monitoring Metrics stopped for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-04-01 09:20:41,397] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
connect            | [2021-04-01 09:20:41,403] INFO Closed monitoring interceptor for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)

标签: postgresqljdbcapache-kafkaapache-kafka-connect

解决方案


您正在设置连接器以解析JSON密钥

"key.converter"                      : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable"       : "true",

但你没有按任何键

producer.send("test_topic06", value=read)

您可以...吗

  • 设置key.converterorg.apache.kafka.connect.storage.StringConverter

或者

  • 也传递具有相同{schema: {}, payload:{}}结构的密钥
producer.send("test_topic06", key=key_value, value=read)

推荐阅读