apache-kafka - 使用 jdbc-sink-connector 中的 kafka SMT 将 avro 模式中的 Map 字段转换为字符串?
问题描述
我有一个定义如下的 avro 模式:
[
{
"namespace": "com.fun.message",
"type": "record",
"name": "FileData",
"doc": "Avro Schema for FileData",
"fields": [
{"name": "id", "type": "string", "doc": "Unique file id" },
{"name": "absolutePath", "type": "string", "doc": "Absolute path of file" },
{"name": "fileName", "type": "string", "doc": "File name" },
{"name": "source", "type": "string", "doc": "unique identification of source" },
{"name": "metaData", "type": {"type": "map", "values": "string"}}
]
}
]
我想使用 jdbc-sink-connector 将此数据推送到 postgres,以便我可以将"metaData"
架构中的字段(映射类型)转换为字符串。我该怎么做呢?
解决方案
您需要使用 SMT 和 AFAIK,目前没有完全满足您要求的 SMT(ExtractField
是一种Map.get
操作,因此无法一次性提取嵌套字段)。您可以查看 Debezium 的io.debezium.transforms.UnwrapFromEnvelope
SMT,您可以对其进行修改以提取嵌套字段。
UnwrapFromEnvelope
正在用于CDC 事件展平,以便从更复杂的结构中提取字段,例如由 Debezium 形成的数据(我相信它与您的结构相似)。
推荐阅读
- ios - UITableView 背景视图自动调整大小
- json - 如果不是数组 powershell,则将 JSON 属性更改为数组
- ssas - MDX // 过滤列上的 DESCANDANTS,包括 ALL
- vue.js - 如何自动测试所有必需的属性都提供给子组件?
- c - 不退出“while”循环
- graphql - (中继)根据查询结果重新运行查询的正确语法是什么
- python-3.x - 在我的函数中以预期的方式排序我的列表时出现问题
- python - Peewee ForeignKeyField as BigIntegerField
- kendo-ui - 如何向我的 Kendo UI 调度程序添加新的布尔值?
- powershell - 用于查找用户运行的进程的 powershell 脚本,如果没有发送电子邮件