首页 > 解决方案 > 将本地机器的火花流连接到 GCP 上的 kafka

问题描述

我目前在 GCP 上有 4 个虚拟机。1 是托管我的 zookeeper 的地方,另外 3 个是名为 kafka-0、kafka-1、kafka-2 的 kafka-brokers。在 VMS 中生成/使用消息时一切正常,但是当我尝试从本地计算机连接到 Kafka 时它开始失败。首先,我为端口 9092(在所有实例上)打开了防火墙规则。然后我为每个实例添加了一个静态外部 IP。我正在尝试从本地 spark 流式传输作业连接到我的 kafka 代理。

只是为了进行健全性检查,我的 zookeeper 能够连接到所有经纪人

i.ie 这个 bash 命令在我的 zookeeper 上运行(10.150.0.6:2181 是 zookeeper 的内部 IP)

zookeeper-shell.sh 10.150.0.6:2181 ls /brokers/ids 

给我以下输出

Connecting to 10.150.0.6:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[0, 1, 2]

我的第一种方法。我尝试使用代理的外部 IP 连接到其中一个 kafka 代理

val df = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",
        "34.86.170.127:9092")
      .option("subscribe", KAFKA_TOPIC_NAME_CONS)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", false)
      .load()

这给了我以下错误

 java.net.UnknownHostException: kafka-0.us-west2-c.c.civic-animal-213016.internal: nodename nor servname provided, or not known

我发现这个错误很有趣,因为即使我只给出它,它也会以某种方式在 GCP 上找出我的主机名。(这可以从“确认zookeeper-shell.sh localhost:2181 <<< "get /brokers/ids/0”)

所以我进一步研究了这个问题,并找到了这篇博文。 https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/并了解了广告监听器。然后我阅读了以下 stackoverflow 交流。

无法从本地机器连接到谷歌计算引擎上的 kafka 服务器

第二种方法

我尝试了答案。所以我虚拟机进入我的 kafka-0 代理实例并运行以下命令

vi kafka_2.12-2.0.0/config/server.properties

然后我取消了广告听众的注释并改变了

#advertised.listeners=PLAINTEXT://localhost:9092

advertised.listeners=PLAINTEXT://[instance_public_id_address]:9092

所以这解决了

advertised.listeners=PLAINTEXT://kafka-0.us-west2-c.c.civic-animal-213016.internal:9092

但是,这仍然给我同样的错误

java.net.UnknownHostException: kafka-0.us-west2-c.c.civic-animal-213016.internal: nodename nor servname provided, or not known

所以尝试 3,我尝试更改而不是使用外部 IP 连接,我可以使用主机名代替

val df = sparkSession.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",
        "kafka-0.us-west2-c.c.civic-animal-213016.internal:9092")
      .option("subscribe", KAFKA_TOPIC_NAME_CONS)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", false)
      .load()

但我收到以下错误。

Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers

我很困惑如何解决这个问题?任何帮助我走得更远的步骤

标签: google-cloud-platformapache-kafkaspark-structured-streaming

解决方案


原来我对广告的内容感到困惑。listeners=PLAINTEXT://[instance_public_id_address]:9092

我变了

advertised.listeners=PLAINTEXT://kafka-0.us-west2-c.c.civic-animal-213016.internal:9092

对此

advertised.listeners=PLAINTEXT://34.86.170.127:9092

其中 34.86.170.127:9092 是我在 VM 实例上的外部 IP。


推荐阅读