java - java.lang.IllegalStateException:此错误处理程序无法处理'org.apache.kafka.common.errors.SslAuthenticationException's;
问题描述
我有一个 Spring Boot 应用程序,我试图通过 SSL 连接到 Kafka 集群。错误日志:
[ERROR] 2020-12-21 17:54:06,570 org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - {} - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.errors.SslAuthenticationException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151) ~[spring-kafka-2.5.5.RELEASE.jar:2.5.5.RELEASE]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:103) ~[spring-kafka-2.5.5.RELEASE.jar:2.5.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1265) ~[spring-kafka-2.5.5.RELEASE.jar:2.5.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1022) ~[spring-kafka-2.5.5.RELEASE.jar:2.5.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLProtocolException: Unexpected handshake message: server_hello
at sun.security.ssl.Alert.createSSLException(Alert.java:129) ~[?:?]
at sun.security.ssl.Alert.createSSLException(Alert.java:117) ~[?:?]
at sun.security.ssl.TransportContext.fatal(TransportContext.java:307) ~[?:?]
at sun.security.ssl.TransportContext.fatal(TransportContext.java:263) ~[?:?]
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
请找到以下 Kafka 配置属性:
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaserver)
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, '5000')
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, '300000')
propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, '300001')
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONSUMER)
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
propsMap.put(SslConfigs.SSL_PROTOCOL_CONFIG,DTOConstants.SSL)
propsMap.put('security.protocol', DTOConstants.SSL)
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "30000")
propsMap.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, this.trustStoreLocation)
propsMap.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, this.trustStorePassword)
propsMap.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, this.keyStoreLocation)
propsMap.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, this.keyStorePassword)
propsMap.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, this.keyPassword)
propsMap.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, null)
解决方案
推荐阅读
- elasticsearch - 在 Windows Logstash 服务中运行多个 conf 文件
- java - 使用实体列表在 JPA 中搜索和排序问题
- php - 如何将类中的 mySQL 结果传递给外部函数?
- android - 在 Recyclerview 中滚动后,选定的项目被取消选中
- three.js - 三个JS中camera LookAt和camera up的区别
- javascript - 在 Javascript 中更改 JSON 对象格式
- bluetooth - 为什么 bluetoothctl 正在扫描但不显示低功耗蓝牙设备?
- azure - 流分析:在同一个作业中处理压缩和未压缩的数据
- java - 如何将 JwtFilter 异常传递给服务器 JSON 答案?
- python - 如何跨多个 DataFrame 应用 if 语句?