postgresql - Kafka Connect:使用 debezium 从 Postgres 流式传输更改到主题
问题描述
我对 Kafka 和 Kafka Connect 世界还很陌生。我正在尝试使用 Kafka(在 MSK 上)、Kafka Connect(使用 PostgreSQL 的 Debezium 连接器)和 RDS Postgres 实例来实现 CDC。Kafka Connect 在我们部署在 AWS 的集群中的 K8 pod 中运行。
在深入了解所用配置的详细信息之前,我将尝试总结问题:
- 连接器启动后,它会按预期向主题发送消息(快照)
- 一旦我们对表进行任何更改(创建、更新、删除),就不会向该主题发送任何消息。我们希望看到有关对表所做更改的消息。
我的连接器配置如下所示:
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.user": "root",
"database.dbname": "insights",
"slot.name": "cdc_organization",
"tasks.max": "1",
"column.blacklist": "password, access_key, reset_token",
"database.server.name": "insights",
"database.port": "5432",
"plugin.name": "wal2json_rds_streaming",
"schema.whitelist": "public",
"table.whitelist": "public.kafka_connect_cdc_test",
"key.converter.schemas.enable": "false",
"database.hostname": "de-test-sre-12373.cbplqnioxomr.eu-west-1.rds.amazonaws.com",
"database.password": "MYSECRETPWD",
"value.converter.schemas.enable": "false",
"name": "source-postgres",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"snapshot.mode": "initial"
}
我们尝试了plugin.name
属性的不同配置wal2josn
:wal2json_streaming
和wal2json_rds_streaming
。
连接器和数据库之间的连接没有问题,因为我们已经看到连接器一启动就有消息流过。
上述连接器是否存在配置问题,导致我们无法看到与主题中出现的新更改相关的消息?
谢谢
解决方案
您的连接器配置看起来有点混乱。我对 Kafka 也很陌生,所以我真的不知道这个问题,但这是我的连接器配置,适合我。
{
"name":"<connector_name>",
"config": {
"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"database.server.name":"<server>",
"database.port":"5432",
"database.hostname":"<host>",
"database.user":"<user>",
"database.dbname":"<password>",
"tasks.max":"1",
"database.history.kafka.boostrap.servers":"localhost:9092",
"database.history.kafka.topic":"<kafka_topic_name>",
"plugin.name":"pgoutput",
"include.schema.changes":"true"
}
}
如果此配置也不起作用,请尝试查找日志控制台;有时错误不是控制台的最后一次写入
推荐阅读
- php - php -> 定时 bash 脚本总是关闭
- swift - 快速将可测试代码与静态方法调度相结合
- excel - Excel AVERAGEIFS looking up ONE of the criteria columns
- javascript - 数据绑定按键事件在 div 上不起作用
- elasticsearch - 如何从 Elastic Search 中的日志文件中获取总执行时间?
- kubernetes - Kubernetes 构建集群时出错,找不到实用程序子网
- cordova - 流星中的 CORS 与 Cordova
- mongodb - MongoDB:按两个字段索引的最快方法?
- python - How do I set optional arguments?
- c# - ElasticSearch C# NEST - scriptscorefunction 不起作用