apache-kafka-connect - 是否可以重命名消息触发运行 debezium mysql 连接器的字段?
问题描述
我已经配置了一个 debezium mysql 连接器,我需要在有效负载中包含附加字段作为表名。我需要做哪些配置更改才能实现这一目标?
解决方案
表名已包含在source.table
元素中。这是一个插入到名为 的表的示例消息rental
:
{
"before": null,
"after": {
"fullfillment.sakila.rental.Value": {
"rental_id": 13346,
"rental_date": 1124483301000,
"inventory_id": 4541,
"customer_id": 131,
"return_date": {
"long": 1125188901000
},
"staff_id": 2,
"last_update": "2006-02-15T21:30:53Z"
}
},
"source": {
"name": "fullfillment",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000002",
"pos": 832,
"row": 0,
"snapshot": {
"boolean": true
},
"thread": null,
"db": {
"string": "sakila"
},
"table": {
"string": "rental"
}
},
"op": "c",
"ts_ms": {
"long": 1518190060267
}
}
如果您想插入其他字段,您可以使用InsertField$Value
单消息转换,您可以在本文中看到一个示例。
编辑:
如果您希望该字段位于消息的不同部分,您有几个选择。您可以使用 Kafka Streams 对数据进行后处理,以根据需要对其进行重组。您可以使用可用的 Single Message Transform 来展平after
组件,然后添加静态值:
"transforms": "unwrap,InsertTopic,InsertSourceDetails",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.InsertTopic.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTopic.topic.field":"messagetopic",
"transforms.InsertSourceDetails.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertSourceDetails.static.field":"messagesource",
"transforms.InsertSourceDetails.static.value":"Debezium CDC from Oracle on asgard"
或者您可以编写自己的单消息转换来准确地进行您想要做的修改。
推荐阅读
- amazon-web-services - 如何使 SAM 模板文件适用于两种不同的环境?
- office-js - 使用 office-js 在 Outlook 插件上实现自定义发送邮件按钮
- ios - 使用触控板在 TableView 上滚动
- angular - 无法使用表单组中的嵌套表单数组正确显示表单
- typescript - 静态方法不是函数:Typescript。Vue
- python - 在 SQLite 查询中比较两个可能为 NULL 的值
- json - 如何在 Apache Flink 流应用程序输出中使用编码的 Unicode 字符解码 Json 文本
- c++ - 程序因未知原因退出 (0xC0000005)
- swift - 用 NSRegularExpression 结果对字符串进行子串化的正确方法是什么?
- c# - 变量没有被赋值,Prefab在我改变了引用游戏对象的值后丢失了引用