首页 > 解决方案 > java.lang.IllegalStateException:此错误处理程序无法处理'org.apache.kafka.common.errors.SslAuthenticationException's;

问题描述

我有一个 Spring Boot 应用程序,我试图通过 SSL 连接到 Kafka 集群。错误日志:

[ERROR] 2020-12-21 17:54:06,570 org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - {} - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.errors.SslAuthenticationException's; no record information is available
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) ~[spring-kafka-2.5.5.RELEASE.jar:2.5.5.RELEASE]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.5.RELEASE.jar:2.5.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1265) ~[spring-kafka-2.5.5.RELEASE.jar:2.5.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1022) ~[spring-kafka-2.5.5.RELEASE.jar:2.5.5.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
    at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
    at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLProtocolException: Unexpected handshake message: server_hello
    at sun.security.ssl.Alert.createSSLException(Alert.java:129) ~[?:?]
    at sun.security.ssl.Alert.createSSLException(Alert.java:117) ~[?:?]
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:307) ~[?:?]
    at sun.security.ssl.TransportContext.fatal(TransportContext.java:263) ~[?:?]
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed

请找到以下 Kafka 配置属性:

        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaserver)
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, '5000')
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, '300000')
        propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, '300001')
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER)
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
        propsMap.put(SslConfigs.SSL_PROTOCOL_CONFIG,DTOConstants.SSL)
        propsMap.put('security.protocol', DTOConstants.SSL)
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000")
        propsMap.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.trustStoreLocation)
        propsMap.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.trustStorePassword)
        propsMap.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.keyStoreLocation)
        propsMap.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.keyStorePassword)
        propsMap.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.keyPassword)
        propsMap.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, null)

标签: javaspringspring-bootsslapache-kafka

解决方案


推荐阅读