首页 > 解决方案 > 如何将新表添加到 Debezium MySQL 连接器?

问题描述

我有 Debezium MySQL 连接器:

{
    "name": "debezium_mysql",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "***",
        "database.port": "3306",
        "database.user": "kafkaconnect",
        "database.password": "${file:/connect-credentials.properties:mysql_pass}",
        "database.server.name": "mysql",
        "heartbeat.interval​.ms": 5000,
        "snapshot.mode": "when_needed",
        "database.include.list": "compare_sports",
        "table.include.list": "compare_sports.matches,compare_sports.games",
        "database.history.kafka.topic": "mysql_compare_sports_history",
        "database.history​.kafka.recovery​.poll.interval.ms": 5000,
        "database.history.kafka.bootstrap.servers": "***:9092",
        "include.schema.changes": "false",
        "transforms": "extractInt",
        "transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractInt.field": "id"
    }
}

我想从同一个mysql中的其他数据库添加新表(存在很长时间)。将其添加到包含列表后,出现错误:

Encountered change event for table new_database.new_table whose schema isn't known to this connector

我尝试创建新的连接器,snapshot.mode: initial但随后只mysql_history创建了新主题但没有所需的new_database.new_table主题

我应该怎么做才能将新数据库中的新表添加到现有连接器?

谢谢

标签: apache-kafka-connectdebezium

解决方案


您可以创建一个新的连接器并将其设置snapshotinitial并指定一个不同的database.server.id

由于表已经存在,您应该包括一个snapshot.override以及默认实现将选择现有表中的所有行,这些行将持有一个锁并阻止写入者写入数据库。你绝对不想这样做。

{
    ...
    "snapshot.mode"="initial",
    "database.server.id": "different",
    "snapshot.select.statement.overrides":"new-table",
    "snapshot.select.statement.new-table":"select * .. where <>"
    ...
}

或者,如果您想使用相同的连接器名称,则必须停止它并手动清除其在内部connect.offsets.storage主题中的偏移量。然后,当重新创建连接器(使用相同的名称)时,它将根据您提供的快照配置恢复其偏移量。

更多这样的案例,可以看这篇博客(光盘:我是作者)


推荐阅读