java - kafka connect JdbcSourceConnector 反序列化问题
问题描述
我正在使用 kafka connect 连接到数据库,以便存储有关压缩主题的信息,并且在尝试在 Spring Cloud Stream 应用程序中使用该主题时遇到反序列化问题。
连接器配置:
{
"name": "my-connector",
"config": {
"name": "my-connector",
"poll.interval.ms": "86400000",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "oracle-jdbc-string",
"connection.user": "testid",
"connection.password": "test",
"catalog.pattern": "mySchema",
"table.whitelist": "MY_TABLE",
"table.types": "TABLE",
"mode": "bulk",
"numeric.mapping": "best_fit",
"transforms": "createKey, extractCode, UpdateTopicName",
"transforms.UpdateTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.extractCode.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractCode.field": "ID",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "ID",
"transforms.UpdateTopicName.regex": "(.*)",
"transforms.UpdateTopicName.replacement": "my_topic",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"topic.prefix": "nt_mny_"
}
}
连接器似乎工作正常,并在主题上放置了适当的消息,并且在使用 kafka-console-consumer 时,示例消息看起来像这样
kafka-console-consumer --bootstrap-server localhost.ntrs.com:9092 --topic nt_mny_ece_alert_avro --from-beginning --property print.key=true | jq '.'
7247
0
{
"ID": 7247,
"USER_SK": 5623,
"TYP_CDE": "TEST",
"ALRT_ACTIVE_FLAG": "Y",
"ALRT_DESC": "My Alert",
"ALRT_STATUS": "VISIBLE",
"CREAT_BY": "ME",
"CREAT_TM": 1593547299565,
"UPD_BY": "ME",
"UPD_TM": 1593547299565
}
我想知道键和值之间打印的 0 是问题还是只是卡夫卡噪音。
我在代码中看到的问题是
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7); nested exception is java.io.CharConversionException: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7)
我的处理器/接收器代码相对简单。
@StreamListener
public void process(
@Input(MyAlertsProcessor.MY_ALERT_AVRO) KStream<String, Json> myAlertKconnectStream) {
myAlertKconnectStream.peek((key,value) -> {
System.out.println("HELOOOOOO");
logger.debug("ece/pre: key={}, value={}",key,value);});
}
我花了几天的时间试图弄清楚这一点,但几乎没有任何帮助!
解决方案
您使用的是 JSON 模式转换器 ( io.confluent.connect.json.JsonSchemaConverter
),而不是 JSON 转换器 ( org.apache.kafka.connect.json.JsonConverter
)。
JSON Schema 转换器使用 Schema Registry 来存储模式,并将有关它的信息放在消息的前面几个字节中。这就是使您的代码(Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7)
)绊倒的原因。
所以要么在你的代码中使用JSON Schema 反序列化器(更好),要么切换到使用org.apache.kafka.connect.json.JsonConverter
转换器(不太可取;然后你扔掉模式)。
推荐阅读
- unity3d - LeanTween 由于时间尺度 0 而没有做任何事情。解决方法是什么?
- c# - C# 在 for 中使用字符串
- php - 在 Lumen Laravel '未定义变量:状态'中运行测试时出现 PHPUnitTest 错误
- sql - Oracle SQL 将十六进制值选择为 SHA256
- c# - Xamarin Forms ListView ItemTapped 到来自 ViewModel 的对象
- javascript - 模拟/测试 Vuex 存储模块给了我 TypeError 问题
- javascript - 在asp页面上使用javascript移动浏览器窗口
- docker - 无法在容器内进行 apt update
- assembly - Bomblab - 第二阶段翻译?
- java - 在命令 `java` 中,`-cp` 是否覆盖了 `$CLASSPATH`,或者是它的前缀?