首页 > 解决方案 > kafka connect JdbcSourceConnector 反序列化问题

问题描述

我正在使用 kafka connect 连接到数据库,以便存储有关压缩主题的信息,并且在尝试在 Spring Cloud Stream 应用程序中使用该主题时遇到反序列化问题。

连接器配置:

    {
  "name": "my-connector",
  "config": {
    "name": "my-connector",
    "poll.interval.ms": "86400000",
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "oracle-jdbc-string",
    "connection.user": "testid",
    "connection.password": "test",
    "catalog.pattern": "mySchema",
    "table.whitelist": "MY_TABLE",
    "table.types": "TABLE",
    "mode": "bulk",
    "numeric.mapping": "best_fit",
    "transforms": "createKey, extractCode, UpdateTopicName",
    "transforms.UpdateTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.extractCode.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractCode.field": "ID",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "ID",
    "transforms.UpdateTopicName.regex": "(.*)",
    "transforms.UpdateTopicName.replacement": "my_topic",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "topic.prefix": "nt_mny_"
  }
}

连接器似乎工作正常,并在主题上放置了适当的消息,并且在使用 kafka-console-consumer 时,示例消息看起来像这样

kafka-console-consumer --bootstrap-server localhost.ntrs.com:9092 --topic nt_mny_ece_alert_avro --from-beginning  --property print.key=true | jq '.'

7247
0
{
  "ID": 7247,
  "USER_SK": 5623,
  "TYP_CDE": "TEST",
  "ALRT_ACTIVE_FLAG": "Y",
  "ALRT_DESC": "My Alert",
  "ALRT_STATUS": "VISIBLE",
  "CREAT_BY": "ME",
  "CREAT_TM": 1593547299565,
  "UPD_BY": "ME",
  "UPD_TM": 1593547299565
}

我想知道键和值之间打印的 0 是问题还是只是卡夫卡噪音。

我在代码中看到的问题是

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7); nested exception is java.io.CharConversionException: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7)

我的处理器/接收器代码相对简单。

@StreamListener
public void process(
    @Input(MyAlertsProcessor.MY_ALERT_AVRO) KStream<String, Json> myAlertKconnectStream) {

    myAlertKconnectStream.peek((key,value) -> {
        System.out.println("HELOOOOOO");
        logger.debug("ece/pre: key={}, value={}",key,value);});

}

我花了几天的时间试图弄清楚这一点,但几乎没有任何帮助!

标签: javaapache-kafkaapache-kafka-connectspring-cloud-stream

解决方案


您使用的是 JSON 模式转换器 ( io.confluent.connect.json.JsonSchemaConverter),而不是 JSON 转换器 ( org.apache.kafka.connect.json.JsonConverter)。

JSON Schema 转换器使用 Schema Registry 来存储模式,并将有关它的信息放在消息的前面几个字节中。这就是使您的代码(Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7))绊倒的原因。

所以要么在你的代码中使用JSON Schema 反序列化器(更好),要么切换到使用org.apache.kafka.connect.json.JsonConverter转换器(不太可取;然后你扔掉模式)。

更多详情:https ://rmoff.net/2020/07/03/why-json-isnt-the-same-as-json-schema-in-kafka-connect-converters-and-ksqldb-viewing-kafka-messages -bytes-as-hex/


推荐阅读