apache-spark - 使用kafka和HBase引起的冲突
问题描述
当我使用“spark streaming”读取“kafka”(需要 sasl 验证)然后将数据存储到“HBase”时,“HBase”会出现以下错误
java.io.IOException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory .java:218) 在 org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:181) 在 com.xueersi.datamining.ups.database.implement.HbaseClient.connect(HbaseClient.scala:91) 在com.xueersi.datamining.ups.stream.start.BaseInfoLogAnalysisStart$$anonfun$main$1$$anonfun$apply$2.apply(BaseInfoLogAnalysisStart.scala:78) at com.xueersi.datamining.ups.stream.start.BaseInfoLogAnalysisStart$$ anonfun$main$1$$anonfun$apply$2.apply(BaseInfoLogAnalysisStart.scala:75) 在 org.apache.spark.rdd。RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala :925) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1956) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1956) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在 org.apache.spark.scheduler.Task.run(Task.scala:99) 在 org.apache.spark.executor.Executor$TaskRunner。在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread 的 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 运行(Executor.scala:325) .run(Thread.java:748) 引起:java.lang.reflect。在 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 在 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 在 java.lang.reflect 的 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 的 InvocationTargetException。 org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238) 的 Constructor.newInstance(Constructor.java:423) ... 15 更多原因:org.apache.hadoop 的 java.lang.ExceptionInInitializerError .hbase.ClusterId.parseFrom(ClusterId.java:64) 在 org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:75) 在 org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry .java:105) 在 org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.(ConnectionManager.java:658) 的 org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:931) ...还有 20 个原因:org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.(SaslRpcServer) 的 org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163) 的 java.lang.NullPointerException .java:381)在 org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186) 在 org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570) 在 org.apache.hadoop。 hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418) 在 org.apache.hadoop.hdfs。NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314) 在 org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68) 在 org.apache.hadoop.hdfs.server.namenode。 ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.(RetryInvocationHandler.java:75) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.(RetryInvocationHandler.java: 66) 在 org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58) 在 org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181) 在 org.apache.hadoop.hdfs .DFSClient.(DFSClient.java:762) 在 org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:693) 在 org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158) 在 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816) 在 org.apache.hadoop.fs.FileSystem .access$200(FileSystem.java:98) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:第2835章.get(FileSystem.java:371) 在 org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) 在 org.apache.hadoop.hbase.util.DynamicClassLoader.initTempDir(DynamicClassLoader.java:120) 在org.apache.hadoop.hbase.util。DynamicClassLoader.(DynamicClassLoader.java:98) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.(ProtobufUtil.java:246) ... 25 更多
但是当我阅读另一个“Kafka”(没有sasl验证)时,“HBase”没有问题。另外,“kerberos”认证需要“HBase”我认为kafka的sasl认证和hbase的kerberos认证有冲突是有人可以给我一些建议吗?
解决方案
我似乎找到了答案:https ://issues.apache.org/jira/browse/KAFKA-5294
然后我手动指定依赖(我以前使用的版本是0.10.2.1)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.11.0.0</version>
<scope>compile</scope>
</dependency>
有用
推荐阅读
- php - Router / URL Matcher 类的问题
- oracle - 如何在 oracle apex 5.0 中将值从一页传递到另一页?
- python - 使用长度不均匀的列表项创建pandas df列?
- sql - 如何在函数中使用全局临时表?
- c++ - C++ 多个 Lua 状态
- shutdown - 运行在 Server 2016 关闭之前发送电子邮件的任务计划程序批处理文件
- bash - 如何在bash中执行通过grep和sed提取的文本
- ansible - 我可以为给定 Ansible 环境中的所有主机设置默认的 host_vars 文件吗?
- sql-server - 是否可以通过索引强制执行此业务规则?
- google-apps-script - 谷歌表格:带有范围的 INDIRECT()