apache-kafka - 无法在分布式模式下启动 Kafka 连接以进行弹性搜索
问题描述
我正在尝试以分布式模式启动 Kafka 连接,即使在独立模式下我也无法继续
这是我的弹性搜索接收器属性
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=5
topics=fsp-audit
key.ignore=true
connection.url=https://****.amazonaws.com
type.name=kafka-connect
errors.tolerance = all
errors.deadletterqueue.topic.name = fsp-dlq-audit-event
这是我的连接-distributed.properties
bootstrap.servers=***:9092,***:9092,***:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
schema.enabled=false
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java
我还预先创建了三个 topi
connect-offsets
connect-configs
connect-status
我在 EC2 上运行它并使用 MSK 作为 Kafka 。我检查了从我的 EC2 到 MSK 的连接,并且我能够远程访问
我得到这个错误
[2020-01-30 08:53:12,126] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
[2020-01-30 08:53:12,145] INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:237)
org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call.
[2020-01-30 08:53:12,149] ERROR Stopping due to error (org.apache.kafka.connect.cli.ConnectDistributed:83)
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:45)
at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:94)
at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:77)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:58)
问题:如果我必须在分布式模式下运行 Kafka 连接,我必须使用多个 EC2/vm 吗?
解决方案
好的,在查看更多细节后,我发现问题出在 NACL 中,它阻止了几个子网 IP 地址。
所以,我检查了 MSK 端的安全组/网络 ACL/路由表,发现它们没问题。这意味着问题可能出在 EC2 实例上,因此我检查了实例的安全组/路由表,发现它们配置正确。
但是,在检查(acl-***)
与 EC2 实例连接的网络 ACL 时,我看到有一个入站规则允许 0.0.0.0/0 用于临时端口,这应该允许代理与 EC2 实例通信。但是,查看出站规则,我看到它只允许存在 b-2 的子网范围,但它没有任何明确的出站规则来允许 b-3(10.**.**.0/24)
或 b-4(10.**.**.0/24)
子网范围。
当我添加新规则时,我能够 ping 并完全连接成功
推荐阅读
- c++ - 无效打印(矢量
) 功能不打印 - java - 具有区域设置语言的 Android 应用程序无法在组件上正确显示文本
- node.js - node Js控制台输入格式?
- actions-on-google - Google Assistant Smarthome:查询风扇转速总是失败
- php - PHP按类别排名
- python - 在 python 中使用 cytpes 模块设置 MS 桌面背景太慢,无法像视频一样使用。
- javascript - 如何在html标签“必需”和javascript“onClick”之间排序
- java - 崩溃时崩溃并出现此异常:java.lang.RuntimeException: Unknown constant type 18
- javascript - 来自雄辩的javascript listTo Array的练习:为什么列表的值不是全局的null
- intellij-idea - Intellij Enum 在覆盖方法上缩进