sql-server - 使用 Debezium 将 MSSQL CDC 流式传输到 AWS MSK
问题描述
我是 Kafka 的新手,目前正在学习使用 Debezium 连接器将流数据从 MSSQL 更改为 Amazon MSK
我已经有一个启用了 CDC 的 MS SQL Server,一个 MSK 集群,我可以通过 EC2 客户端手动连接、创建主题、生成和使用数据。现在我正在设置一个带有 Debezium SQL Server 连接器的 MSK Connect 作为自定义插件,这是我的 MSK 连接器配置:
connector.class = io.debezium.connector.sqlserver.SqlServerConnector,
tasks.max = 1
database.hostname = xxx,
database.port = xxx,
database.user = xxx,
database.password = xxx,
database.dbname = dbName,
database.server.name = serverName,
table.include.list = dbo.tableName,
database.history.kafka.bootstrap.servers = xxx,
database.history.kafka.topic = xxx
但我的 MSK 连接器不断返回状态失败。虽然我已经搜索了谷歌,但似乎没有与我的想法相关的说明或指南。
这让我想知道我的解决方案是否可行?有人可以阐明一下并指出正确的方向吗?
已编辑:我从 CloudWatch 获得的一些日志
ERROR [AdminClient clientId=adminclient-1] Connection to node -2 () failed authentication due to: []: Access denied (org.apache.kafka.clients.NetworkClient:771)
INFO App info kafka.admin.client for adminclient-1 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [4f91d358-fb7b-4f3b-8930-1b4aefce6d0b]: Access denied
[Worker-08134a52fe88cdc49] MSK Connect encountered errors and failed.
非常感谢,
解决方案
如果您为 MSK 集群使用基于 IAM 角色的身份验证,您的引导服务器端口将为9098
除了所有属性,您还可以在 MSK 连接配置中发送这些属性
database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler