首页 > 解决方案 > 为什么从 Kafka JDBC 源连接器生成的密钥以 L 为前缀?

问题描述

我正在尝试将 Kafka 连接配置为为从 Oracle 19c 表生成的消息生成密钥。在尝试遵循Confluent 文档中显示的设置时,我遇到了一些意外行为。当在 VARCHAR 字段上将 ValueToKey SMT 与 ExtractField SMT 链接起来时,我的键以 L 和一些空的 Unicode 字符为前缀。这是 Kafka Connect 产生的我的信息:

    [{
      "topic":"BUSINESS_AUD",
      "partition":0,
      "offset":9,
      "timestamp":1617723230767,
      "timestampType":"CREATE_TIME",
      "headers":[],
      "key":"\u0000\u0000\u0000\u0000\u0001\u0002L{B4832FC8-BBCF-488C-9720-97C4D3283FEF}",
      "value":{
         "AUD_ID":{
            "long":11042260
         },
         "REV":{
            "long":80325258
         },
         "ID":{
            "long":31549560804
         },
         "BUSINESSID":{
            "string":"{B4832FC8-BBCF-488C-9720-97C4D3283FEF}"
         },
         "BUSINESS_PROPERTY_LU_ID":{
            "long":24
         },
         "VALUE":{
            "string":"business value"
         },
         "DTYPE":"VERSION"
      }
   }]

这是我的源架构:

    create table AUDITDB.BUSINESS_AUD
(
    AUD_ID                  NUMBER(38)   not null
        constraint PKBUSINESS_AUD 
            primary key,
    REV                     NUMBER(38)   not null
        constraint FKBUSINESS_AUD
            references AUDITDB.REVISION,
    REVTYPE                 NUMBER(3),
    ID                      NUMBER(38),
    BUSINESSID                   VARCHAR2(38),
    BUSINESS_PROPERTY_LU_ID NUMBER(38),
    VALUE                   VARCHAR2(1200) default NULL,
    DTYPE                   VARCHAR2(15) not null
)

这就是我配置 jdbc 连接器的方式:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
    "name": "jdbc_source_oracle_BUSINESS_AUD",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "errors.log.enable": true,
        "errors.log.include.messages": true,
        "connection.url": "jdbc:oracle:thin:@10.0.0.8:7511:t1fnet",
        "connection.user": "oracleUser",
        "connection.password": "oracleUserPassword",
        "mode": "incrementing",
        "incrementing.column.name" : "AUD_ID",
        "numeric.mapping" : "best_fit",
        "poll.interval.ms": "5000",
        "transforms":"createKey,ExtractField",
        "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields":"BUSINESSID",
        "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.ExtractField.field":"BUSINESSID",
        "query" : "SELECT CAST(DMDA.AUD_ID AS NUMBER(18)) AS AUD_ID, CAST(DMDA.REV AS NUMBER(18)) AS REV, CAST(DMDA.ID AS NUMBER(18)) AS ID, DMDA.BUSINESSID, CAST(DMDA.BUSINESS_PROPERTY_LU_ID AS NUMBER(18)) AS BUSINESS_PROPERTY_LU_ID, DMDA.VALUE, DMDA.DTYPE FROM AUDITDB.BUSINESS_AUD DMDA",
        "topic.prefix": "BUSINESS_AUD"
    }
}'


我不确定 L 是从哪里来的。当我尝试使用 NUMBER 字段(如 ID)进行相同设置时,我只会得到 Unicode 垃圾:
[{
    "topic":"BUSINESS_AUD",
    "partition":0,
    "offset":24149,
    "timestamp":1617732719435,
    "timestampType":"CREATE_TIME",
    "headers":[
       
    ],
    "key":"\u0000\u0000\u0000\u0000\u0001\u0002�����\u0001",
    "value":{
       "AUD_ID":{
          "long":11205147
       },
       "REV":{
          "long":81016468
       },
       "ID":{
          "long":31549704671
       },
       "BUSINESSID":{
          "string":"{03E796CC-C0AB-4CBD-930E-CA99D9A31362}"
       },
       "BUSINESS_PROPERTY_LU_ID":{
          "long":17
       },
       "VALUE":{
          "string":"{03E796CC-C0AB-4CBD-930E-CA99D9A31362}"
       },
       "DTYPE":"VERSION"
    }
 }]

我相当确定问题在于我如何使用 ExtractField SMT,因为当我从链中删除该 SMT 时,它会产生我对 ValueToKey SMT 的期望:

[{
  "topic":"BUSINESS_AUD",
  "partition":0,
  "offset":27311,
  "timestamp":1617733541872,
  "timestampType":"CREATE_TIME",
  "headers":[
     
  ],
  "key":{
     "BUSINESSID":{
        "string":"{C2D8CAA4-C964-4AFE-B194-21651187BD23}"
     }
  },
  "value":{
     "AUD_ID":{
        "long":11213627
     },
     "REV":{
        "long":81114719
     },
     "ID":{
        "long":31549717943
     },
     "BUSINESSID":{
        "string":"{C2D8CAA4-C964-4AFE-B194-21651187BD23}"
     },
     "BUSINESS_PROPERTY_LU_ID":{
        "long":24
     },
     "VALUE":{
        "string":"businessValue"
     },
     "DTYPE":"VERSION"
  }
}]

这让我相信 ExtractField 对从 ValueToKey 输出的对象有困难,我只是不确定要改变什么来获得我期望的交互。

我真的被困在这里,对此的任何帮助将不胜感激。

标签: apache-kafkaapache-kafka-connectconfluent-platform

解决方案


所以,我要说显示的输出是使用 Avro 作为键的错误(或者,至少对于字符串或整数等原始字段)

我不确定 L,但它是二进制数据 UTF8 解码的结果

前缀\u0000\u0000\u0000\u0000\u00010x0+ (int 1),这是您BUSINESS_AUD-key在注册表中拥有的架构 ID。

通常,由于键很少是结构化类型,因此您应该尝试使用相应key.converter的类来提取要提取的类型,例如 StringConverter


推荐阅读