apache-kafka - Debezium 连接器发件箱转换
问题描述
我正在尝试将 MySql 源连接器与 debezium 支持的发件箱 SMT 一起使用,并具有以下配置。我正在使用最新的 debezium-core 和 debezium-mysql-connector (1.1) 罐子
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost: 8083/connectors/ -d '{
"name": "debezium-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "MySql",
"database.port": "3306",
"database.user": "**",
"database.password": "**",
"database.server.id": "1033113244",
"database.server.name": "anomaly-changelog",
"database.whitelist": "anomaly",
"database.history.kafka.bootstrap.servers": "Kafka:9092",
"database.history.kafka.topic": "anomaly.schema.history",
"transforms": "outbox,reroute",
"transforms.reroute.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.reroute.regex": "(.*)",
"transforms.reroute.replacement": "$1-SMT",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter"
}
}'
但我仍然收到以下错误:
{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value io.debezium.transforms.outbox.EventRouter for configuration transforms.outbox.type: Class io.debezium.transforms.outbox.EventRouter could not be found.\nInvalid value null for configuration transforms.outbox.type: Not a Transformation}
我不明白为什么它没有被识别。
解决方案
你可以尝试这样的事情:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" demo:8083/connectors/ -d '{ "name": "order-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mariadb_order", "database.port": "3306", "database.user": "root", "database.password": "***", "database.server.id": "223344", "database.server.name": "orderdbserver","table.whitelist": "orderdb.outbox", "transforms": "outbox", "transforms.outbox.type" :"io.debezium.transforms.outbox.EventRouter", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic":"dbhistory.orderdb", "transforms.outbox.table.fields.additional.placement" : "aggregateid:envelope:id" } }'
推荐阅读
- apache-nifi - NiFi:QueryElasticSearchHTTP结果到属性
- java - Spring CXF Soap 客户端 OAuth2 客户端凭据
- django-models - 如何在 Django ModelForm 中显示 CHOICES
- java - 设计模式(命令模式)避免多个 if 条件
- join - 如何有效地将两列范围转换为扩展表?
- java - for循环中的迭代是什么意思?
- react-native - 自定义 Header 组件中屏幕转换上的标题动画?
- database - 创建多个表 sqlplus
- python-3.x - 列表索引必须是整数或切片,而不是 dict - 在特定情况下?
- google-bigquery - 将 UNIX 时间 (INT) 转换为 BigQuery 中的时间戳