首页 > 解决方案 > 如何在 Kafka sink JDBC 连接器中转换和提取字段

问题描述

我正在使用第 3 方 CDC 工具将数据从源数据库复制到 Kafka 主题中。示例行如下所示:

{  
   "data":{  
      "USER_ID":{  
         "string":"1"
      },
      "USER_CATEGORY":{  
         "string":"A"
      }
   },
   "beforeData":{  
      "Data":{  
         "USER_ID":{  
            "string":"1"
         },
         "USER_CATEGORY":{  
            "string":"B"
         }
      }
   },
   "headers":{  
      "operation":"UPDATE",
      "timestamp":"2018-05-03T13:53:43.000"
   }
}

在 sink 文件中需要什么配置才能提取所有(子)字段dataheaders忽略下面的字段beforeData,以便 Kafka Sink 传输数据的目标表将包含以下字段:

USER_ID, USER_CATEGORY, operation, timestamp

我浏览了confluent 文档中的转换列表,但我无法找到如何使用它们来实现上述目标。

标签: apache-kafkaapache-kafka-connect

解决方案


我想你想要ExtractField,不幸的是,这是一个Map.get操作,所以这意味着 1) 嵌套字段不能一次获得 2) 多个字段需要多次转换。

话虽这么说,你可能会尝试这个(未经测试)

transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers

如果这不起作用,您最好实现自己的 Transformations 包,该包至少可以从 Struct / Map 中删除值。


推荐阅读