mysql - 如何在kafka jdbc连接源中过滤数据库中的表
问题描述
我在 Confluent 社区平台中使用 Kafka Connect 来保持 MySQL 数据库同步。Sources 和 sinks 是 MySQL 数据库。它没有用。
我的情况有一些问题:
同一台服务器中的其他数据库中有表,我不想将它们读入 Kafka,但 Kafka Connect Source 一直在尝试读取其他数据库。
我想
org.apache.kafka.connect.json.JsonConverter
在源连接器和接收器连接器中使用,但接收器连接器无法正确插入。我想同步几个数据库,不同数据库中的表可能具有相同的表名,如何避免表名冲突和接收连接器正确路由 Kafka 主题以将数据插入正确的数据库? MySQL 同步说明
Kafka JDBC 源连接器配置文件是:
{
"name": "br-auths-3910472223-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/br_auths?user=root&password=123456",
"database.whitelist":"br_auths",
"table.blacklist": "br_auths.__migrationversions,br_auths.auths_service_apps",
"mode": "timestamp",
"timestamp.column.name": "utime",
"validate.non.null": "false",
"incrementing.column.name": "id",
"topic.prefix": "br_auths__"
}
}
Kafka JDBC Sink 连接器配置文件是:
{
"name": "br-auths-3910472223-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"true",
"tasks.max": "1",
"connection.url": "jdbc:mysql://rm-hp303a0n2vr8970.mysql.huhehaote.rds.aliyuncs.com:3306/dev-br-auths-391047222?user=br_auths&password=@123456",
"topics": "br_auths__auths_roles,br_auths__auths_user_logins,br_auths__auths_user_roles,br_auths__auths_users,br_auths__auths_user_claims,br_auths__auths_user_tokens,br_auths__auths_role_claims",
"auto.create": "true",
"insert.mode": "upsert",
"transforms":"dropTopicPrefix",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"br_auths__(.*)",
"transforms.dropTopicPrefix.replacement":"$1"
}
}
我想为不同的数据库创建几对源连接器和接收器连接器,MySQL server-A 中的数据库 A 中的一些白名单表可以与 MySQL server-B 中的数据库 A 增量同步。
更新1:
我改为connect-avro-distributed、Debezium Source Connector和JDBC Sink Connector。源连接器是:
{
"name":"br-auths-3910472223-source",
"config":{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "root",
"database.password": "br123456",
"database.useLegacyDatetimeCode": "false",
"database.server.id": "184",
"database.server.name": "local3910472223",
"database.whitelist":"br_auths",
"database.history.kafka.bootstrap.servers": "localhost:9092",
"database.history.kafka.topic": "schema-changes.br-auths.local3910472223" ,
"table.blacklist": "br_auths.__migrationversions,br_auths.auths_service_apps",
"include.schema.changes": "true",
"transforms": "route,TimestampConverter",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.target.type": "string",
"transforms.TimestampConverter.field": "payload.after.ctime",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$2__$3"
}
}
接收器连接器是:
{
"name": "br-auths-3910472223-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://rm-hp303a0n2.mysql.huhehaote.rds.aliyuncs.com:3306/dev-br-auths-391047222?useLegacyDatetimeCode=false&user=br_auths&password=123456",
"dialect.name": "MySqlDatabaseDialect",
"topics.regex": "br_auths__(.*)",
"transforms": "dropTopicPrefix,unwrap",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"br_auths__(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"insert.mode": "upsert",
"pk.fields": "Id",
"pk.mode": "record_value"
}
}
Avro 消息转换为 json,如下所示:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "Id"
},
{
"type": "string",
"optional": false,
"field": "UserId"
},
{
"type": "string",
"optional": false,
"field": "RoleId"
},
{
"type": "string",
"optional": true,
"field": "APPID"
},
{
"type": "int32",
"optional": false,
"default": 0,
"field": "IsDeleted"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "ctime"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "utime"
}
],
"optional": true,
"name": "local3910472223.br_auths.auths_user_roles.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "Id"
},
{
"type": "string",
"optional": false,
"field": "UserId"
},
{
"type": "string",
"optional": false,
"field": "RoleId"
},
{
"type": "string",
"optional": true,
"field": "APPID"
},
{
"type": "int32",
"optional": false,
"default": 0,
"field": "IsDeleted"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "ctime"
},
{
"type": "int64",
"optional": false,
"name": "io.debezium.time.Timestamp",
"version": 1,
"default": 0,
"field": "utime"
}
],
"optional": true,
"name": "local3910472223.br_auths.auths_user_roles.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "local3910472223.br_auths.auths_user_roles.Envelope"
},
"payload": {
"before": null,
"after": {
"Id": "DB4DA841364860D112C3C76BDCB36635",
"UserId": "0000000000",
"RoleId": "5b7e5f9b4bc00d89c4cf96ae",
"APPID": "br.region2",
"IsDeleted": 0,
"ctime": 1550138524000,
"utime": 1550138524000
},
"source": {
"version": "0.8.3.Final",
"name": "local3910472223",
"server_id": 0,
"ts_sec": 0,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 64606,
"row": 0,
"snapshot": true,
"thread": null,
"db": "br_auths",
"table": "auths_user_roles",
"query": null
},
"op": "c",
"ts_ms": 1550568556614
}
}
使用 MySQL 日期时间类型的列被序列化为一个大整数,JDBC 接收器连接器尝试插入 MySQL 日期时间列,但失败了。
所以我transforms.TimestampConverter
在源连接配置中写了,但是 ctime, utime 列没有改变。怎么了?
解决方案
- 如果你想让你的数据库保持同步,JDBC Source 连接器不是最好的——你想使用适当的基于日志的 CDC,对于 MySQL,你可以通过 Debezium 获得。更多细节在这里。
- 如果你没有对数据做任何其他事情,你甚至需要 Kafka 吗?专用的 MySQL 复制工具会更合适吗?
针对您的具体问题。这篇文章将解决你的很多问题。尤其是:
同一台服务器中的其他数据库中有表,我不想将它们读入 Kafka,但 Kafka Connect Source 一直在尝试读取其他数据库。
table.whitelist
根据需要使用、table.blacklist
和的组合schema.pattern
。如果您无法将整个模式与一个连接器匹配,则需要使用多个连接器来实现所需的设置。我想在源连接器和接收器连接器中使用 org.apache.kafka.connect.json.JsonConverter,但接收器连接器无法正确插入。
如果没有您解释“无法正确插入”,就很难回答这个问题。一般来说,我会使用 Avro,因为它有更丰富的模式支持和更高效的消息(没有嵌入式模式,模式存储在 Schema Registry 中)。有关更多详细信息,请参见此处。
我想同步多个数据库,不同数据库中的表可能具有相同的表名,如何避免表名冲突和接收连接器正确路由 Kafka 主题以将数据插入正确的数据库?
您将希望
topic.prefix
在源连接器上使用组合来标记来自特定源的主题,然后RegexRouter
在源连接器和/或接收器中使用单消息转换(正如您已经找到的那样)进一步操作主题名称连接器。您可能需要使用多个接收器连接器topics.regex
来选择特定主题以路由到特定数据库。
推荐阅读
- java - 在添加新组件时(每当按下按钮时),是否有更简单的方法可以将“旧”组件向下移动?
- ios - 在应用程序中实现后台刷新的最佳方法是什么?
- sql - 单列中的多个值作为索引视图
- c++ - 如何从 lambda 捕获父目录
- javascript - Navigator.MediaDevices.getUserMedia() 使用哪些相机通信标准?
- c# - 如何访问面板的子组件?统一
- c# - 输入未被接受
- c++ - 促进。如何在另一个测试单元中运行一个测试单元?
- android - 错误:类 SpotsDialog 中的构造函数 SpotsDialog 不能应用于给定类型;
- regex - 基于多个列表中存在的单词的关键字检查