mysql - 为数据库中的多个表配置 debezium 连接器
问题描述
我正在尝试为 MySQL 数据库中的多个表配置 Debezium 连接器(我在 MySQL 8.0 上使用 debezium 1.4)。在kafka中创建主题时,我的公司有一个命名模式要遵循,并且这种模式不允许使用下划线(_),所以我不得不用连字符(-)替换它们
所以,我的主题名称是:
话题一
fjf.db.top-domain.domain.sub-domain.transaction-search.order-status
WHERE
- transaction-search = schema "transaction_search"
- order-status = table "order_status".
- All changes in that table, must go to that topic.
话题二
fjf.db.top-domain.domain.sub-domain.transaction-search.shipping-tracking
WHERE
- transaction-search = schema "transaction_search"
- shipping-tracking = table "shipping_tracking"
- All changes in that table, must go to that topic.
话题 3
fjf.db.top-domain.domain.sub-domain.transaction-search.proposal
WHERE
- transaction-search = schema "transaction_search"
- proposal = table "proposal"
- All changes in that table, must go to that topic.
我正在尝试使用转换“ByLogicalTableRouter”,但我找不到解决我的情况的正则表达式解决方案。
{ "name": "debezium.connector",
"config":
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "myhostname",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "1000",
"database.server.name": "fjf.db.top-domain.domain.sub-domain.transaction-search",
"schema.include.list": "transaction_search",
"table.include.list": "transaction_search.order_status,transaction_search.shipping_tracking,transaction_search.proposal",
"database.history.kafka.bootstrap.servers": "kafka.intranet:9097",
"database.history.kafka.topic": "fjf.db.top-domain.domain.sub-domain.transaction-search.schema-history",
"snapshot.mode": "schema_only",
"transforms":"RerouteName,RerouteUnderscore",
"transforms.RerouteName.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteName.topic.regex":"(.*)transaction_search(.*)",
"transforms.RerouteName.topic.replacement": "$1$2"
"transforms.RerouteUnderscore.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.RerouteUnderscore.topic.regex":"(.*)_(.*)",
"transforms.RerouteUnderscore.topic.replacement": "$1-$2"
}
}
- 在第一次转换中,我试图删除主题路由中重复的模式名称。
- 在第二个变换中,替换所有剩余的下划线 _ 代表 hiphens -
但是有了这个,我得到了下面的错误,这表明它正在尝试将所有内容发送到同一个主题
Caused by: org.apache.kafka.connect.errors.SchemaBuilderException: Cannot create field because of field name duplication __dbz__physicalTableIdentifier
如何进行转换,将每个表的事件转发到各自的主题?
解决方案
- 删除架构名称
在第一次转换中,我试图删除主题路由中重复的模式名称。
使用正则表达式进行转换后,您将有两个点,因此您需要修复它:
"transforms.RerouteName.topic.regex":"([^.]+)\\.transaction_search\\.([^.]+)",
"transforms.RerouteName.topic.replacement": "$1.$2"
- 替换下划线
您可以尝试使用来自Kafka Connect Common Transformations的ChangeCase SMT 。
推荐阅读
- python - 使用 Scipy 将一组数据点拟合到任意曲线时出现问题
- python - 将 cv2 形状从 python 转换为 c++
- java - 使用 Omnikey 5022 读取 PACS(原始韦根)数据
- google-sheets - Google 表格选择、计数、限制
- vb.net - 获取服务器响应 vb.net 时出现 404 错误
- html - 使用 typeScript 滚动到我的 webView 上的 x,y 坐标
- sql - 需要:Pl/pgSQL 函数的“COPY to/from ...”的替代方案
- angular6 - 多项目轮播在Angular 6中不起作用
- python - Python烧瓶应用程序在Windows中作为Windows服务多处理运行
- c# - 如何从 Visual Studio Fake 中删除引用?