apache-kafka - 如何在 Kafka sink JDBC 连接器中转换和提取字段
问题描述
我正在使用第 3 方 CDC 工具将数据从源数据库复制到 Kafka 主题中。示例行如下所示:
{
"data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"A"
}
},
"beforeData":{
"Data":{
"USER_ID":{
"string":"1"
},
"USER_CATEGORY":{
"string":"B"
}
}
},
"headers":{
"operation":"UPDATE",
"timestamp":"2018-05-03T13:53:43.000"
}
}
在 sink 文件中需要什么配置才能提取所有(子)字段data
并headers
忽略下面的字段beforeData
,以便 Kafka Sink 传输数据的目标表将包含以下字段:
USER_ID, USER_CATEGORY, operation, timestamp
我浏览了confluent 文档中的转换列表,但我无法找到如何使用它们来实现上述目标。
解决方案
我想你想要ExtractField
,不幸的是,这是一个Map.get
操作,所以这意味着 1) 嵌套字段不能一次获得 2) 多个字段需要多次转换。
话虽这么说,你可能会尝试这个(未经测试)
transforms=ExtractData,ExtractHeaders
transforms.ExtractData.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractData.field=data
transforms.ExtractHeaders.type=org.apache.kafka.connect.transforms.ExtractField$Value
transforms.ExtractHeaders.field=headers
如果这不起作用,您最好实现自己的 Transformations 包,该包至少可以从 Struct / Map 中删除值。