apache-kafka-connect - 如何将新表添加到 Debezium MySQL 连接器?
问题描述
我有 Debezium MySQL 连接器:
{
"name": "debezium_mysql",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "***",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "${file:/connect-credentials.properties:mysql_pass}",
"database.server.name": "mysql",
"heartbeat.interval.ms": 5000,
"snapshot.mode": "when_needed",
"database.include.list": "compare_sports",
"table.include.list": "compare_sports.matches,compare_sports.games",
"database.history.kafka.topic": "mysql_compare_sports_history",
"database.history.kafka.recovery.poll.interval.ms": 5000,
"database.history.kafka.bootstrap.servers": "***:9092",
"include.schema.changes": "false",
"transforms": "extractInt",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "id"
}
}
我想从同一个mysql中的其他数据库添加新表(存在很长时间)。将其添加到包含列表后,出现错误:
Encountered change event for table new_database.new_table whose schema isn't known to this connector
我尝试创建新的连接器,snapshot.mode: initial
但随后只mysql_history
创建了新主题但没有所需的new_database.new_table
主题
我应该怎么做才能将新数据库中的新表添加到现有连接器?
谢谢
解决方案
您可以创建一个新的连接器并将其设置snapshot
为initial
并指定一个不同的database.server.id
由于表已经存在,您应该包括一个snapshot.override
以及默认实现将选择现有表中的所有行,这些行将持有一个锁并阻止写入者写入数据库。你绝对不想这样做。
{
...
"snapshot.mode"="initial",
"database.server.id": "different",
"snapshot.select.statement.overrides":"new-table",
"snapshot.select.statement.new-table":"select * .. where <>"
...
}
或者,如果您想使用相同的连接器名称,则必须停止它并手动清除其在内部connect.offsets.storage
主题中的偏移量。然后,当重新创建连接器(使用相同的名称)时,它将根据您提供的快照配置恢复其偏移量。
更多这样的案例,可以看这篇博客(光盘:我是作者)
推荐阅读
- python - 在 pandas 中查找本地重复项(彼此跟随)
- python - 钩子 `black` 需要预先提交 2.9.2 版,但安装的是 2.6.0 版
- .net - Microsoft.Build.CentralPackageVersions 和文件夹结构
- javascript - 从firebase实时数据库获取准确的Unix时间戳?
- ios - 当用户尝试使用 PHPickerViewController 在 IOS 14、swift 5 中使用选定的照片权限检索视频时,获取视频数据为零
- python - 如何在 Scrapy 中将产品变体分成单独的行?
- javascript - 如何在浏览器 Chrome 中使用 js ::占位符颜色
- javascript - 提取除特定字段外所有字段均等于的对象
- c# - Page.ClientScript.RegisterStartupScript 不加载弹出窗口
- javascript - 如何使用正则表达式屏蔽电子邮件地址?