首页 > 解决方案 > 使用springboot连接多个Kafka服务器

问题描述

在 Spring Boot 应用程序中,我想同时连接到 2 个不同的 kafka 服务器。我正在使用 KafkaAdmin 和 AdminClient 建立连接并执行 CRUD 操作。

@Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();

        String krb5location = krb5Location;
        System.setProperty("java.security.krb5.conf", krb5location);
        System.setProperty("java.security.auth.login.config", jaasConfigLocation);

        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, server);
        configs.put("security.protocol", "SASL_SSL");
        configs.put("ssl.truststore.location", sslTruststoreLocation);
        configs.put("ssl.truststore.password", sslTruststorePassowrd);

        return new KafkaAdmin(configs);
    }

    @Bean
    @PostConstruct
    public AdminClient config() {

        return AdminClient.create(kafkaAdmin.getConfig());

    }

同样,服务器 2 配置在同一个 springboot 应用程序中。

如果我在应用程序初始化期间同时加载两个 kafka 服务器的配置,则会显示以下错误

>>>KRBError:
     cTime is Sun Jun 03 14:23:02 IST 2001 991558382000
     sTime is Tue Nov 20 10:46:53 IST 2018 1542691013000
     suSec is 512097
     error code is 7
     error Message is Server not found in Kerberos database
     cname is config1@servername.com
     sname is config2@servernname.com
     msgType is 30
    at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:73)
    at sun.security.krb5.KrbTgsReq.getReply(KrbTgsReq.java:251)
    at sun.security.krb5.KrbTgsReq.sendAndGetCreds(KrbTgsReq.java:262)
    at sun.security.krb5.internal.CredentialsUtil.serviceCreds(CredentialsUtil.java:308)
    at sun.security.krb5.internal.CredentialsUtil.acquireServiceCreds(CredentialsUtil.java:126)
    at sun.security.krb5.Credentials.acquireServiceCreds(Credentials.java:458)
    at sun.security.jgss.krb5.Krb5Context.initSecContext(Krb5Context.java:693)
    at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:248)
    at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
    at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:361)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:359)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:359)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:269)
    at org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:206)
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:81)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:474)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1006)
    at java.lang.Thread.run(Thread.java:748)
Caused by: KrbException: Identifier doesn't match expected value (906)
    at sun.security.krb5.internal.KDCRep.init(KDCRep.java:140)
    at sun.security.krb5.internal.TGSRep.init(TGSRep.java:65)
    at sun.security.krb5.internal.TGSRep.<init>(TGSRep.java:60)
    at sun.security.krb5.KrbTgsRep.<init>(KrbTgsRep.java:55)
    ... 22 more
2018-11-20 10:46:53.605 ERROR 8672 --- [| adminclient-4] org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-4] Connection to node -1 failed authentication due to: An error: (java.security.PrivilegedActionException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Server not found in Kerberos database (7) - UNKNOWN_SERVER)]) occurred when evaluating SASL token received from the Kafka Broker. This may be caused by Java's being unable to resolve the Kafka Broker's hostname correctly. You may want to try to adding '-Dsun.net.spi.nameservice.provider.1=dns,sun' to your client's JVMFLAGS environment. Users must configure FQDN of kafka brokers when authenticating using SASL and `socketChannel.socket().getInetAddress().getHostName()` must match the hostname in `principal/hostname@realm` Kafka Client will go to AUTHENTICATION_FAILED state.

标签: javaapache-kafkakerberosspring-kafka

解决方案


推荐阅读