首页 > 解决方案 > 使用kafka和HBase引起的冲突

问题描述

当我使用“spark streaming”读取“kafka”(需要 sasl 验证)然后将数据存储到“HBase”时,“HBase”会出现以下错误

java.io.IOException: java.lang.reflect.InvocationTargetException at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:240) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory .java:218) 在 org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:181) 在 com.xueersi.datamining.ups.database.implement.HbaseClient.connect(HbaseClient.scala:91) 在com.xueersi.datamining.ups.stream.start.BaseInfoLogAnalysisStart$$anonfun$main$1$$anonfun$apply$2.apply(BaseInfoLogAnalysisStart.scala:78) at com.xueersi.datamining.ups.stream.start.BaseInfoLogAnalysisStart$$ anonfun$main$1$$anonfun$apply$2.apply(BaseInfoLogAnalysisStart.scala:75) 在 org.apache.spark.rdd。RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala :925) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1956) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1956) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 在 org.apache.spark.scheduler.Task.run(Task.scala:99) 在 org.apache.spark.executor.Executor$TaskRunner。在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread 的 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 运行(Executor.scala:325) .run(Thread.java:748) 引起:java.lang.reflect。在 sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 在 sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 在 java.lang.reflect 的 sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 的 InvocationTargetException。 org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238) 的 Constructor.newInstance(Constructor.java:423) ... 15 更多原因:org.apache.hadoop 的 java.lang.ExceptionInInitializerError .hbase.ClusterId.parseFrom(ClusterId.java:64) 在 org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:75) 在 org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry .java:105) 在 org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.(ConnectionManager.java:658) 的 org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:931) ...还有 20 个原因:org.apache.hadoop.security.SaslRpcServer$FastSaslServerFactory.(SaslRpcServer) 的 org.apache.kafka.common.security.plain.PlainSaslServer$PlainSaslServerFactory.getMechanismNames(PlainSaslServer.java:163) 的 java.lang.NullPointerException .java:381)在 org.apache.hadoop.security.SaslRpcServer.init(SaslRpcServer.java:186) 在 org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:570) 在 org.apache.hadoop。 hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418) 在 org.apache.hadoop.hdfs。NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314) 在 org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider$DefaultProxyFactory.createProxy(ConfiguredFailoverProxyProvider.java:68) 在 org.apache.hadoop.hdfs.server.namenode。 ha.ConfiguredFailoverProxyProvider.getProxy(ConfiguredFailoverProxyProvider.java:152) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.(RetryInvocationHandler.java:75) 在 org.apache.hadoop.io.retry.RetryInvocationHandler.(RetryInvocationHandler.java: 66) 在 org.apache.hadoop.io.retry.RetryProxy.create(RetryProxy.java:58) 在 org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:181) 在 org.apache.hadoop.hdfs .DFSClient.(DFSClient.java:762) 在 org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:693) 在 org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:158) 在 org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2816) 在 org.apache.hadoop.fs.FileSystem .access$200(FileSystem.java:98) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2853) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:第2835章.get(FileSystem.java:371) 在 org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) 在 org.apache.hadoop.hbase.util.DynamicClassLoader.initTempDir(DynamicClassLoader.java:120) 在org.apache.hadoop.hbase.util。DynamicClassLoader.(DynamicClassLoader.java:98) at org.apache.hadoop.hbase.protobuf.ProtobufUtil.(ProtobufUtil.java:246) ... 25 更多

但是当我阅读另​​一个“Kafka”(没有sasl验证)时,“HBase”没有问题。另外,“kerberos”认证需要“HBase”我认为kafka的sasl认证和hbase的kerberos认证有冲突是有人可以给我一些建议吗?

标签: apache-sparkapache-kafkahbase

解决方案


我似乎找到了答案:https ://issues.apache.org/jira/browse/KAFKA-5294

然后我手动指定依赖(我以前使用的版本是0.10.2.1)

<dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.11</artifactId>
        <version>0.11.0.0</version>
        <scope>compile</scope>
</dependency>

有用


推荐阅读