首页 > 解决方案 > 如何将我的示例 kafka 生产者应用程序连接到远程 kafka 服务器?

问题描述

https://www.javainuse.com/spring/spring-boot-apache-kafka-hello-world

我按照这个示例构建了一个简单的 Kafka 生产者进行一些测试。但是,我的 Kafka 实例不是本地的,由我公司的基础架构团队管理,因此它运行在远程服务器上。我有它正在运行的地址和端口,但我不确定如何配置我的应用程序以连接到它。我是否需要使用 application.properties 文件来定义 kafka 地址池或类似的东西?

spring.kafka.bootstrap-servers=[kafka-server-1:port],[kafka-server-2:port]
spring.kafka.consumer.group-id=my-sample-group
spring.kafka.security.protocol=SSL

在我为消费者关注的教程中,我的 application.properties 中有类似的内容,但我不确定生产者是否相同或应该不同。这里的第二个值似乎特定于消费者,并且基于我的 IDE 中的自动完成,我没有看到生产者的类似值。

任何帮助表示赞赏

我已经更改了我的代码并按照https://docs.spring.io/spring-kafka/reference/html/#spring-boot-producer-app中的示例进行操作

现在,当我尝试连接时,输出只是在关于我的连接池中的 2 个 kafka 服务器的 WARN 消息之间交替:

2021-11-10 11:44:29.609 WARN 11839 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -2 (kafka1.mycompany.com/[some-ip-address]:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.

2021-11-10 11:44:29.798 WARN 11839 --- [| adminclient-1] org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-1] Connection to node -1 (kafka2.mycompany.com/[some-ip-address]:9092) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.

一开始它似乎输出了一些关于 kafka 设置的信息。也许那里有一些关于我的 application.properties 中可能需要哪些附加参数才能实际连接的信息。

bootstrap.servers = [kafka1.mycompany.com:9092, kafka2.mycompany.com:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = SSL
    security.providers = null
    send.buffer.bytes = 131072
    socket.connection.setup.timeout.max.ms = 127000
    socket.connection.setup.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.certificate.chain = null
    ssl.keystore.key = null
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.certificates = null
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

标签: spring-bootapache-kafkaspring-kafka

解决方案


您遵循的教程非常过时。我建议查看官方文档:https ://docs.spring.io/spring-kafka/reference/html/#spring-boot-producer-app 。根据您的设置,配置可能只是一个引导服务器。

spring.kafka.bootstrap-servers=kafka.your-company.com:9092

我会问 Kafka ops 团队如何连接,因为使用 jaas 和 ssl 会变得很复杂。

Spring Boot Kafka 配置设置可以在这里找到:https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.admin。客户编号


推荐阅读