首页 > 解决方案 > 升级所有 kafka-pod 后,java 中的 kafka 消费者客户端无法重新连接到 kubernetes kafka 代理

问题描述

我使用 spring-kafka(2.2.4.RELEASE) 来使用来自 kafka-server 的消息。kafka 客户端和服务器都部署在 k8s 集群中。通常,在 kafka 代理上生产和消费消息是可以的。但是当 kafka-brokers 升级时,kafka 客户端无法重新连接到 broker。

据我所知,当bootstrap-servers虚拟 ip 时,kafka 客户端重新连接有一个错误(详细信息在这里)。我的问题类似于vip错误。

在我的情况下,bootstrap-servers地址是k8s kafka服务名:port,当kafka-brokers升级时,kafka服务名对应的真实ip发生变化。所以kafka客户端永远不会重新连接成功。我怎样才能解决这个问题?

环境

Client Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.10", GitCommit:"098570796b32895c38a9a1c9286425fb1ececa18", GitTreeState:"clean", BuildDate:"2018-08-02T17:19:54Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"9", GitVersion:"v1.9.10", GitCommit:"098570796b32895c38a9a1c9286425fb1ececa18", GitTreeState:"clean", BuildDate:"2018-08-02T17:11:51Z", GoVersion:"go1.9.3", Compiler:"gc", Platform:"linux/amd64"}
> kubectl get svc -o wide -nbingotestdev|grep kafkadev
kafkadev                ClusterIP   None            <none>        9091/TCP                          1y        app=kafkadev
kafkadev-out            NodePort    10.68.206.93    <none>        9091:37142/TCP                    257d      app=kafkadev

> kubectl get pod -o wide -nbingotestdev|grep kafkadev
kafkadev-0                               1/1       Running             0          15h       172.20.10.59    10.171.113.45
kafkadev-1                               1/1       Running             0          15h       172.20.13.95    10.171.113.33
kafkadev-2                               1/1       Running             0          15h       172.20.2.173    10.171.113.62

标签: javakubernetesapache-kafkakafka-consumer-api

解决方案


您必须确保始终拥有一个静态分配的 IP 集,当消费者获取引导服务器时,无论是通过外部 DNS 服务还是使用 k8s api 客户端直接检查正在运行的 Kafka 服务,该 IP 集都会作为广告侦听器返回,然后获取所有地址以构建您的引导服务器字符串


推荐阅读