apache-kafka - 为什么从 Kafka JDBC 源连接器生成的密钥以 L 为前缀?
问题描述
我正在尝试将 Kafka 连接配置为为从 Oracle 19c 表生成的消息生成密钥。在尝试遵循Confluent 文档中显示的设置时,我遇到了一些意外行为。当在 VARCHAR 字段上将 ValueToKey SMT 与 ExtractField SMT 链接起来时,我的键以 L 和一些空的 Unicode 字符为前缀。这是 Kafka Connect 产生的我的信息:
[{
"topic":"BUSINESS_AUD",
"partition":0,
"offset":9,
"timestamp":1617723230767,
"timestampType":"CREATE_TIME",
"headers":[],
"key":"\u0000\u0000\u0000\u0000\u0001\u0002L{B4832FC8-BBCF-488C-9720-97C4D3283FEF}",
"value":{
"AUD_ID":{
"long":11042260
},
"REV":{
"long":80325258
},
"ID":{
"long":31549560804
},
"BUSINESSID":{
"string":"{B4832FC8-BBCF-488C-9720-97C4D3283FEF}"
},
"BUSINESS_PROPERTY_LU_ID":{
"long":24
},
"VALUE":{
"string":"business value"
},
"DTYPE":"VERSION"
}
}]
这是我的源架构:
create table AUDITDB.BUSINESS_AUD
(
AUD_ID NUMBER(38) not null
constraint PKBUSINESS_AUD
primary key,
REV NUMBER(38) not null
constraint FKBUSINESS_AUD
references AUDITDB.REVISION,
REVTYPE NUMBER(3),
ID NUMBER(38),
BUSINESSID VARCHAR2(38),
BUSINESS_PROPERTY_LU_ID NUMBER(38),
VALUE VARCHAR2(1200) default NULL,
DTYPE VARCHAR2(15) not null
)
这就是我配置 jdbc 连接器的方式:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_oracle_BUSINESS_AUD",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"errors.log.enable": true,
"errors.log.include.messages": true,
"connection.url": "jdbc:oracle:thin:@10.0.0.8:7511:t1fnet",
"connection.user": "oracleUser",
"connection.password": "oracleUserPassword",
"mode": "incrementing",
"incrementing.column.name" : "AUD_ID",
"numeric.mapping" : "best_fit",
"poll.interval.ms": "5000",
"transforms":"createKey,ExtractField",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"BUSINESSID",
"transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.ExtractField.field":"BUSINESSID",
"query" : "SELECT CAST(DMDA.AUD_ID AS NUMBER(18)) AS AUD_ID, CAST(DMDA.REV AS NUMBER(18)) AS REV, CAST(DMDA.ID AS NUMBER(18)) AS ID, DMDA.BUSINESSID, CAST(DMDA.BUSINESS_PROPERTY_LU_ID AS NUMBER(18)) AS BUSINESS_PROPERTY_LU_ID, DMDA.VALUE, DMDA.DTYPE FROM AUDITDB.BUSINESS_AUD DMDA",
"topic.prefix": "BUSINESS_AUD"
}
}'
我不确定 L 是从哪里来的。当我尝试使用 NUMBER 字段(如 ID)进行相同设置时,我只会得到 Unicode 垃圾:
[{
"topic":"BUSINESS_AUD",
"partition":0,
"offset":24149,
"timestamp":1617732719435,
"timestampType":"CREATE_TIME",
"headers":[
],
"key":"\u0000\u0000\u0000\u0000\u0001\u0002�����\u0001",
"value":{
"AUD_ID":{
"long":11205147
},
"REV":{
"long":81016468
},
"ID":{
"long":31549704671
},
"BUSINESSID":{
"string":"{03E796CC-C0AB-4CBD-930E-CA99D9A31362}"
},
"BUSINESS_PROPERTY_LU_ID":{
"long":17
},
"VALUE":{
"string":"{03E796CC-C0AB-4CBD-930E-CA99D9A31362}"
},
"DTYPE":"VERSION"
}
}]
我相当确定问题在于我如何使用 ExtractField SMT,因为当我从链中删除该 SMT 时,它会产生我对 ValueToKey SMT 的期望:
[{
"topic":"BUSINESS_AUD",
"partition":0,
"offset":27311,
"timestamp":1617733541872,
"timestampType":"CREATE_TIME",
"headers":[
],
"key":{
"BUSINESSID":{
"string":"{C2D8CAA4-C964-4AFE-B194-21651187BD23}"
}
},
"value":{
"AUD_ID":{
"long":11213627
},
"REV":{
"long":81114719
},
"ID":{
"long":31549717943
},
"BUSINESSID":{
"string":"{C2D8CAA4-C964-4AFE-B194-21651187BD23}"
},
"BUSINESS_PROPERTY_LU_ID":{
"long":24
},
"VALUE":{
"string":"businessValue"
},
"DTYPE":"VERSION"
}
}]
这让我相信 ExtractField 对从 ValueToKey 输出的对象有困难,我只是不确定要改变什么来获得我期望的交互。
我真的被困在这里,对此的任何帮助将不胜感激。
解决方案
所以,我要说显示的输出是使用 Avro 作为键的错误(或者,至少对于字符串或整数等原始字段)
我不确定 L,但它是二进制数据 UTF8 解码的结果
前缀\u0000\u0000\u0000\u0000\u0001
是0x0
+ (int 1)
,这是您BUSINESS_AUD-key
在注册表中拥有的架构 ID。
通常,由于键很少是结构化类型,因此您应该尝试使用相应key.converter
的类来提取要提取的类型,例如 StringConverter
推荐阅读
- maven - 无法从配置文件配置附加系统属性
- fortran - Gfortran 编译错误:必须将 (1) 处的逻辑与 .eqv 进行比较。而不是==
- node.js - 在 var 中使用查询字符串进行搜索在 mongoose 中不起作用
- php - JSON在解码时只返回第一个字符?
- c# - 是否可以仅为单个命名空间创建 Nuget 包?
- c# - 如何在线找到最近的点?
- python - 有什么方法可以在没有 cmp sort 参数的情况下对大对象进行排序?
- pandas - 具有多个元素的数组的真值是不明确的。使用 a.any() 或 a.all()',
- npm - 进行 npm 审计修复时不修改 package.json
- c++ - 从另一个文件c ++访问变量值