首页 > 解决方案 > Debezium SQL Server Source Connector 设置 Kafka 代理

问题描述

我正在尝试设置 Confluent Kafka 平台的 docker 环境并与 Debezium SQL Server Source Connector 集成。

我遵循Kafka 平台的 Confluent 指南,然后遵循了 SQL Server 源连接器Debezium 教程。

我的代理容器名为broker,这个容器和其他容器(包括connect容器)在同一个网络中,我确保它们可以相互 ping 和 telnet。

在 Debezium 教程中,我停留在Start The Debezium SQL Server Connector的步骤中,因为我收到一个错误,表明连接器正在尝试通过localhost:9092而不是访问 Kafka 代理broker:9092

[2020-03-29 10:46:30,907] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,114] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,969] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:34,127] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:35,333] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:36,238] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

直到它最终超时:

[2020-03-29 10:46:38,664] ERROR WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 60000ms

有趣的是,我可以在日志开头看到我的配置已成功接收(查找broker:9092):

[2020-03-29 10:45:38,618] INFO    database.history.kafka.bootstrap.servers = broker:9092 (io.debezium.connector.common.BaseSourceTask)

...

[2020-03-29 10:45:38,655] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [broker:9092]
        check.crcs = true
        client.dns.lookup = default
        client.id = server1-dbhistory
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = server1-dbhistory
        group.instance.id = null

这是我的配置文件register-sqlserver.json::

{
 "name": "inventory-connector",
 "config": {
     "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
     "tasks.max" : "1",
     "database.server.name" : "server1",
     "database.hostname" : "sqlserver_1",
     "database.port" : "1433",
     "database.user" : "sa",
     "database.password" : "Password!",
     "database.dbname" : "testDB",
     "database.history.kafka.bootstrap.servers" : "broker:9092",
     "database.history.kafka.topic": "schema-changes.inventory"
     }
 }  

我通过主机添加连接器如下(就像指南一样):

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json  

我展示的日志是connect容器日志的输出。

我的完整日志中没有显示其他localhost词,因此不用担心localhost我可能会错过的其他默认值配置。

将不胜感激任何帮助:)

标签: dockerapache-kafkaapache-kafka-connectdebeziumconfluent-platform

解决方案


问题在于广告的听众

您正在连接到9092每个配置用于侦听器的代理,该侦听器将其主机宣传为localhost

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092

这意味着客户端(在本例中为 Debezium)最初将连接到您提供给它的引导服务器(broker:9092),但代理会将广告主机()交还给客户端localhost- 然后客户端将尝试连接到该主机. 由于它们位于不同的实例上,localhost对于 Debeziu,连接器不是代理,因此连接失败。

参考:https ://rmoff.net/2018/08/02/kafka-listeners-explained/

解决方案:

使用 port 29092,根据上述配置绑定到broker广告主机,它将从 Debezium 容器正确解析

"database.history.kafka.bootstrap.servers" : "broker:29092"

推荐阅读