首页 > 解决方案 > 使用 srimzi 在 Openshift 上设置 Kafka

问题描述

我正在尝试使用本指南在 Openshift 平台上设置 kafka 集群: https ://developers.redhat.com/blog/2018/10/29/how-to-run-kafka-on-openshift-the-enterprise -kubernetes-with-amq-streams/

我的 zookeeper 和 kafka 集群运行如下所示: 豆荚 当我的应用程序作为引导服务器运行时,我输入到 my-cluster-kafka-external 引导程序的路由。但是当我尝试向 Kafka 发送消息时,我收到了这条消息:

21:32:40.548 [http-nio-8080-exec-1] ERROR o.s.k.s.LoggingProducerListener () - Exception thrown when sending a message with key='key' and payload='Event(id=null, number=30446C77213B40000004tgst15, itemId=, serialNumber=0,  locat...' to topic tag-topic:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

该主题已成功创建,并且在我的计算机上使用本地 kafka 运行时应用程序运行良好。那么我做错了什么,为什么我无法访问Kafka并发送消息?

这是我在 spring-kafka 中的 kafka 生产者配置:

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServers;    

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "........kafka.EventSerializer");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

        return props;
    }


    @Bean
    public ProducerFactory<String, Event> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

编辑:我将日志记录级别设置为调试并发现:

23:59:27.412 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Initialize connection to node my-cluster-kafka-bootstrap-kafka-test............... (id: -1 rack: null) for sending metadata request
23:59:27.412 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Initiating connection to node my-cluster-kafka-bootstrap-kafka-test............ (id: -1 rack: null)
23:59:28.010 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.n.Selector () - [Consumer clientId=consumer-1, groupId=id] Created socket with SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node -1
23:59:28.010 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Completed connection to node -1. Fetching API versions.
23:59:28.010 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Initiating API versions fetch from node -1.
23:59:28.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.n.Selector () - [Consumer clientId=consumer-1, groupId=id] Connection with my-cluster-kafka-bootstrap-kafka-test........../52.215.40.40 disconnected
java.io.EOFException: null
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124) ~[kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93) ~[kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235) ~[kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196) ~[kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:547) ~[kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:483) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.common.network.Selector.poll(Selector.java:412) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:258) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:230) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:221) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:153) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:205) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:284) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) [kafka-clients-1.0.2.jar:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) [kafka-clients-1.0.2.jar:?]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:700) [spring-kafka-2.1.10.RELEASE.jar:2.1.10.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:514) [?:?]
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) [?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
    at java.lang.Thread.run(Thread.java:844) [?:?]
23:59:28.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Node -1 disconnected.
23:59:28.510 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] DEBUG o.a.k.c.NetworkClient () - [Consumer clientId=consumer-1, groupId=id] Give up sending metadata request since no node is available
2

这和connections.max.idle.ms经纪人的财产有关系吗?这里有人遇到了类似的问题。

我尝试kafka-console-producer通过运行以下命令来使用:

bin\windows\kafka-console-producer --broker-list https://my-cluster-kafka-bootstrap-kafka-test.domain.com:443 --topic tag-topic --producer.config config/producer.properties

并在 producer.properties 中使用此配置:

compression.type=none
security.protocol=SSL
ssl.truststore.location=C:\\Tools\\kafka_2.12-2.2.0\\config\\store.jks
ssl.truststore.password=password
ssl.keystore.location=C:\\Tools\\kafka_2.12-2.2.0\\config\\store.jks
ssl.keystore.password=password
ssl.key.password=password

但我收到回复说连接在身份验证时终止:

[2019-05-21 16:15:58,444] WARN [Producer clientId=console-producer] Connection to node 1 (my-cluster-kafka-1-kafka-test.domain.com/52.xxx.xx.40:443) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)

有没有办法证明openshift的证书是错误的?

标签: javaapache-kafkaopenshiftspring-kafkastrimzi

解决方案


只有使用 Strimzi 生成的 CA 证书才能通过 TLS 通过路由进行访问,您必须按照文章中的说明提取该证书。然后,您必须创建一个密钥库来导入证书并将其提供给客户端应用程序。我在您的生产者中没有看到这样的配置。


推荐阅读