首页 > 解决方案 > 使用 jdbc-sink-connector 中的 kafka SMT 将 avro 模式中的 Map 字段转换为字符串?

问题描述

我有一个定义如下的 avro 模式:

[    
{
    "namespace": "com.fun.message",
    "type": "record",
    "name": "FileData",
    "doc": "Avro Schema for FileData",
    "fields": [
        {"name": "id", "type": "string", "doc": "Unique file id" },
        {"name": "absolutePath", "type": "string", "doc": "Absolute path of file" },
        {"name": "fileName", "type": "string", "doc": "File name" },
        {"name": "source", "type": "string", "doc": "unique identification of source" },
        {"name": "metaData", "type": {"type": "map", "values": "string"}}
    ]
}
]

我想使用 jdbc-sink-connector 将此数据推送到 postgres,以便我可以将"metaData"架构中的字段(映射类型)转换为字符串。我该怎么做呢?

标签: apache-kafkaapache-kafka-connect

解决方案


您需要使用 SMT 和 AFAIK,目前没有完全满足您要求的 SMT(ExtractField是一种Map.get操作,因此无法一次性提取嵌套字段)。您可以查看 Debezium 的io.debezium.transforms.UnwrapFromEnvelopeSMT,您可以对其进行修改以提取嵌套字段。

UnwrapFromEnvelope正在用于CDC 事件展平,以便从更复杂的结构中提取字段,例如由 Debezium 形成的数据(我相信它与您的结构相似)。


推荐阅读