docker - Debezium SQL Server Source Connector 设置 Kafka 代理
问题描述
我正在尝试设置 Confluent Kafka 平台的 docker 环境并与 Debezium SQL Server Source Connector 集成。
我遵循了Kafka 平台的 Confluent 指南,然后遵循了 SQL Server 源连接器的Debezium 教程。
我的代理容器名为broker
,这个容器和其他容器(包括connect
容器)在同一个网络中,我确保它们可以相互 ping 和 telnet。
在 Debezium 教程中,我停留在Start The Debezium SQL Server Connector的步骤中,因为我收到一个错误,表明连接器正在尝试通过localhost:9092
而不是访问 Kafka 代理broker:9092
:
[2020-03-29 10:46:30,907] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,114] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:32,969] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:34,127] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:35,333] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-03-29 10:46:36,238] WARN [Consumer clientId=server1-dbhistory, groupId=server1-dbhistory] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
直到它最终超时:
[2020-03-29 10:46:38,664] ERROR WorkerSourceTask{id=inventory-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.common.errors.TimeoutException: Failed to get offsets by times in 60000ms
有趣的是,我可以在日志开头看到我的配置已成功接收(查找broker:9092
):
[2020-03-29 10:45:38,618] INFO database.history.kafka.bootstrap.servers = broker:9092 (io.debezium.connector.common.BaseSourceTask)
...
[2020-03-29 10:45:38,655] INFO ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [broker:9092]
check.crcs = true
client.dns.lookup = default
client.id = server1-dbhistory
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = server1-dbhistory
group.instance.id = null
这是我的配置文件register-sqlserver.json
::
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "sqlserver_1",
"database.port" : "1433",
"database.user" : "sa",
"database.password" : "Password!",
"database.dbname" : "testDB",
"database.history.kafka.bootstrap.servers" : "broker:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}
我通过主机添加连接器如下(就像指南一样):
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
我展示的日志是connect
容器日志的输出。
我的完整日志中没有显示其他localhost
词,因此不用担心localhost
我可能会错过的其他默认值配置。
将不胜感激任何帮助:)
解决方案
问题在于广告的听众。
您正在连接到9092
每个配置用于侦听器的代理,该侦听器将其主机宣传为localhost
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
这意味着客户端(在本例中为 Debezium)最初将连接到您提供给它的引导服务器(broker:9092
),但代理会将广告主机()交还给客户端localhost
- 然后客户端将尝试连接到该主机. 由于它们位于不同的实例上,localhost
对于 Debeziu,连接器不是代理,因此连接失败。
参考:https ://rmoff.net/2018/08/02/kafka-listeners-explained/
解决方案:
使用 port 29092
,根据上述配置绑定到broker
广告主机,它将从 Debezium 容器正确解析
"database.history.kafka.bootstrap.servers" : "broker:29092"
推荐阅读
- java - 通过唯一名称查找实体,如果未找到则创建,填充然后保存。由于并发,两个事务节省了两倍
- python - 绘图仅在左侧一直绘图,并显示黑条而不是 x 轴标签
- julia - VegaLite.jl 忽略行的顺序
- vba - VBA:类模块:获取和让
- c# - 在 .net 核心的 class.cs 中传递 url 的最佳方法是什么?
- c++ - 是否可以禁用 p2p 并仅在 libtorrent 中使用 HTTPSeed
- c# - 为什么我的代码在同步时不能正常工作?
- airflow - 从配置文件中选择 DAG 相关常量而不重新启动气流
- python - Python twtter-bot 托管?
- javascript - 如何获取 fetch api 的字符串响应并将其分配给变量?