首页 > 解决方案 > 无法在分布式模式下启动 Kafka 连接以进行弹性搜索

问题描述

我正在尝试以分布式模式启动 Kafka 连接,即使在独立模式下我也无法继续

这是我的弹性搜索接收器属性

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=5
topics=fsp-audit
key.ignore=true
connection.url=https://****.amazonaws.com
type.name=kafka-connect
errors.tolerance = all
errors.deadletterqueue.topic.name = fsp-dlq-audit-event

这是我的连接-distributed.properties

bootstrap.servers=***:9092,***:9092,***:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
schema.enabled=false
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java

我还预先创建了三个 topi

connect-offsets
connect-configs
connect-status

我在 EC2 上运行它并使用 MSK 作为 Kafka 。我检查了从我的 EC2 到 MSK 的连接,并且我能够远程访问

我得到这个错误

[2020-01-30 08:53:12,126] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
[2020-01-30 08:53:12,145] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
[2020-01-30 08:53:12,149] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:83)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:94)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:77)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)

问题:如果我必须在分布式模式下运行 Kafka 连接,我必须使用多个 EC2/vm 吗?

标签: apache-kafkaapache-kafka-connectaws-msk

解决方案


好的,在查看更多细节后,我发现问题出在 NACL 中,它阻止了几个子网 IP 地址。

所以,我检查了 MSK 端的安全组/网络 ACL/路由表,发现它们没问题。这意味着问题可能出在 EC2 实例上,因此我检查了实例的安全组/路由表,发现它们配置正确。

但是,在检查(acl-***)与 EC2 实例连接的网络 ACL 时,我看到有一个入站规则允许 0.0.0.0/0 用于临时端口,这应该允许代理与 EC2 实例通信。但是,查看出站规则,我看到它只允许存在 b-2 的子网范围,但它没有任何明确的出站规则来允许 b-3(10.**.**.0/24)或 b-4(10.**.**.0/24)子网范围。
当我添加新规则时,我能够 ping 并完全连接成功


推荐阅读