首页 > 解决方案 > 使用 Debezium 将 MSSQL CDC 流式传输到 AWS MSK

问题描述

我是 Kafka 的新手,目前正在学习使用 Debezium 连接器将流数据从 MSSQL 更改为 Amazon MSK

我已经有一个启用了 CDC 的 MS SQL Server,一个 MSK 集群,我可以通过 EC2 客户端手动连接、创建主题、生成和使用数据。现在我正在设置一个带有 Debezium SQL Server 连接器的 MSK Connect 作为自定义插件,这是我的 MSK 连接器配置:

connector.class = io.debezium.connector.sqlserver.SqlServerConnector, 
tasks.max = 1
database.hostname = xxx, 
database.port = xxx, 
database.user = xxx, 
database.password = xxx, 
database.dbname = dbName, 
database.server.name = serverName, 
table.include.list = dbo.tableName, 
database.history.kafka.bootstrap.servers = xxx, 
database.history.kafka.topic = xxx 

但我的 MSK 连接器不断返回状态失败。虽然我已经搜索了谷歌,但似乎没有与我的想法相关的说明或指南。

这让我想知道我的解决方案是否可行?有人可以阐明一下并指出正确的方向吗?

已编辑:我从 CloudWatch 获得的一些日志

ERROR [AdminClient clientId=adminclient-1] Connection to node -2 () failed authentication due to: []: Access denied (org.apache.kafka.clients.NetworkClient:771)

INFO App info kafka.admin.client for adminclient-1 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)

[INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)

org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.

Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [4f91d358-fb7b-4f3b-8930-1b4aefce6d0b]: Access denied

[Worker-08134a52fe88cdc49] MSK Connect encountered errors and failed.

非常感谢,

标签: sql-serverapache-kafkadebeziumaws-msk

解决方案


如果您为 MSK 集群使用基于 IAM 角色的身份验证,您的引导服务器端口将为9098

除了所有属性,您还可以在 MSK 连接配置中发送这些属性

database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

参考:https ://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/


推荐阅读