ssl - Issue connecting to SSL secured Kafka
问题描述
I'm trying to get kafka spark streaming consumer to work with a Kafka broker secured with SSL. The consumer runs on a Kerberised Hadoop cluster. So my consumer code uses the following config:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="(username)" password="(password)";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
The consumer fails with the below exception
Caused by: java.lang.NullPointerException
at org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163) ~[kafka-clients-0.10.0-kafka-2.1.0.jar:?]
at org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.<init>(SaslRpcServer.java:381) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418) ~[hadoop-hdfs-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314) ~[hadoop-hdfs-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68) ~[hadoop-hdfs-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152) ~[hadoop-hdfs-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:75) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.io.retry.RetryInvocationHandler.<init>(RetryInvocationHandler.java:66) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181) ~[hadoop-hdfs-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:730) ~[hadoop-hdfs-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:673) ~[hadoop-hdfs-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:155) ~[hadoop-hdfs-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:98) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2835) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:387) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) ~[hadoop-common-2.6.0-cdh5.12.1.jar:?]
at org.apache.spark.internal.io.SparkHadoopWriter.open(SparkHadoopWriter.scala:82) ~[spark-core_2.11-2.2.0.cloudera2.jar:2.2.0.cloudera2]
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1133) ~[spark-core_2.11-2.2.0.cloudera2.jar:2.2.0.cloudera2]
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125) ~[spark-core_2.11-2.2.0.cloudera2.jar:2.2.0.cloudera2]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) ~[spark-core_2.11-2.2.0.cloudera2.jar:2.2.0.cloudera2]
at org.apache.spark.scheduler.Task.run(Task.scala:108) ~[spark-core_2.11-2.2.0.cloudera2.jar:2.2.0.cloudera2]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) ~[spark-core_2.11-2.2.0.cloudera2.jar:2.2.0.cloudera2]
... 3 more
After some initial analysis, I found that the hadoop-commons SaslRpcServer class inits FastSaslServerFactory with a null property map. The PlainSaslServer class in kafka-clients tries to fetch from this map leading to an NPE.
Am I missing something? There are not a lot of posts on this issue on the net so any pointers are appreciated.
解决方案
我已经解决了这个问题。此问题是由kafka-clients
版本引起的。您可以将版本升级kafka-clients
到0.11
. 细节在这里。
并且 kafka 版本0.11.0
的客户可以与版本0.10.0
或更新的经纪人交谈。所以你可以放心升级!
推荐阅读
- c# - 在运行时检查标题列中的复选框时,如何检查数据网格中的所有复选框?
- bash - 如何将用户输入与文本文件的内容进行比较
- javascript - 在 React 类之间传递数据不起作用
- sql-server - SSIS:无法将“System.Datetime”类型的对象转换为“System.Char[]”类型
- python - 烧瓶棉花糖表序列化失败
- sql - 如何使用按时间和日期排序的唯一递增值更新所有行的列?
- c# - 使用 while 循环测试平均分
- google-maps - Google Maps API Directions Service 路线详情(警告)
- css - 如何使用相同的 spritesheet 在同一页面上拥有多个 css 动画?
- regex - 正则表达式替换日期部分