首页 > 技术文章 > Spark(十四)【SparkSQL集成Hive】

wh984763176 2020-08-10 15:01 原文

1.内嵌的HIVE

如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.

Hive 的元数据存储在 derby 中, 默认仓库地址:$SPARK_HOME/spark-warehouse

实际使用中, 几乎没有不会使用内置的 Hive

2.集成外部的Hive

spark-shell

① 将Hive的目录下的/opt/module/hive/conf/hive-site.xml拷贝到%SPARK_HOME%/conf/目录下

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <!-- jdbc连接的URL,metastore:存储元数据的mysql的库 -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hadoop102:3306/hive_metastore?useSSL=false</value>
        </property>

    <!-- jdbc连接的Driver-->
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
        </property>

        <!-- jdbc连接的登录Mysql的username-->
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>

    <!-- jdbc连接的登录Mysql的password -->
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>root</value>
    </property>
    <!-- Hive默认在HDFS的工作目录,存储数据的工作目录 -->
    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/user/hive/warehouse</value>
    </property>
    
   <!-- Hive元数据存储版本的验证 -->
    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>
    <!-- 指定存储元数据要连接的地址 -->
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://hadoop102:9083</value>
    </property>
    <!-- 指定hiveserver2连接的端口号 -->
    <property>
    <name>hive.server2.thrift.port</name>
    <value>10000</value>
    </property>
   <!-- 指定hiveserver2连接的host -->
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>hadoop102</value>
    </property>
</configuration>

② 将Hive 的/lib目录下的Mysql的驱动拷贝到%SPARK_HOME%/jars/目录下

③ 重启spark-shell,即可操作数据

Idea开发中

①pom依赖

 <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.27</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.10.1</version>
        </dependency>
    </dependencies>

②在resource中添加hive-site.xml保留metastore连接配置。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>

    <!--指定存储元数据要连接的地址-->
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://hadoop102:9083</value>
    </property>
</configuration>
package com.spark.saprk_sql

import org.apache.spark.sql.SparkSession
/**
 * @description:  SaprkSQL 操作 Hive
 * @author: HaoWu
 * @create: 2020年08月10日
 */
object SparkHiveTest {
  def main(args: Array[String]): Unit = {
    //========================== 创建SparkSession ============================
    System.setProperty("HADOOP_USER_NAME", "atguigu")
    val spark: SparkSession = SparkSession.builder
      .config("spark.sql.warehouse.dir", "hdfs://hadoop102:8020/user/hive/warehouse")
      .enableHiveSupport()
      .master("local[*]")
      .appName("sparksql")
      .getOrCreate()
    
    //'spark'是上文创建的SparkSession对象,不是一个包;
    import spark.implicits._
    
    //========================== 读 Hive ============================
    val df = spark.sql("use sparksqltest")
    df.createOrReplaceGlobalTempView("sparksqltest")
    spark.sql("select * from user_visit_action ").show()


    //========================== 写 Hive ============================
    // 1. 先有df
    val df1 = spark.read.json("c:/json/people.json")
    val df2 = List(("a", 11L), ("b", 22L)).toDF("n", "a")
    df1.printSchema()
    // 2. 写法1: 使用saveAsTable(在保存的时候, 看列名, 只要列名一致, 顺序不重要)
    df1.write.saveAsTable("user_1")
    df1.write.mode("append").saveAsTable("user_1")
    // 3. 写法2: 使用 insertInto( 不看列名, 只看顺序(类型)
    df.write.insertInto("user_1") // 大致等价于: df.write.mode("append").saveAsTable("user_1")
    df.write.insertInto("user_1") // 大致等价于: df.write.mode("append").saveAsTable("user_1")
    // 4. 写法3: 使用 hive的insert 语句
    spark.sql("insert into table user_1 values('zs', 100)").show
    spark.sql("select * from user_1").show

    //关闭sparksession
    spark.close()
  }
}


结果

+----+----------+-----+
| uid|subject_id|score|
+----+----------+-----+
|1001|        01|   90|
|1001|        02|   90|
|1001|        03|   90|
|1002|        01|   85|
|1002|        02|   85|
|1002|        03|   70|
|1003|        01|   70|
|1003|        02|   70|
|1003|        03|   85|
+----+----------+-----+

注意:在开发工具中创建数据库默认是在本地仓库,通过参数修改数据库仓库的地址: config("spark.sql.warehouse.dir", "hdfs://linux1:8020/user/hive/warehouse")

FAQ

1.添加hive-site.xml配置文件

SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
20/08/10 14:19:36 WARN SharedState: Not allowing to set spark.sql.warehouse.dir or hive.metastore.warehouse.dir in SparkSession's options, it should be set statically for cross-session usages
20/08/10 14:19:41 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
20/08/10 14:19:41 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
20/08/10 14:19:44 WARN MetaData: Metadata has jdbc-type of null yet this is not valid. Ignored
20/08/10 14:19:44 WARN MetaData: Metadata has jdbc-type of null yet this is not valid. Ignored
20/08/10 14:19:44 WARN MetaData: Metadata has jdbc-type of null yet this is not valid. Ignored

  1. 添加jackson依赖
Exception in thread "main" java.lang.NoClassDefFoundError: com/fasterxml/jackson/core/exc/InputCoercionException
	at com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.<init>(ScalaNumberDeserializersModule.scala:48)
	at com.fasterxml.jackson.module.scala.deser.NumberDeserializers$.<clinit>(ScalaNumberDeserializersModule.scala)
	at com.fasterxml.jackson.module.scala.deser.ScalaNumberDeserializersModule.$init$(ScalaNumberDeserializersModule.scala:60)
	at com.fasterxml.jackson.module.scala.DefaultScalaModule.<init>(DefaultScalaModule.scala:18)
	at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<init>(DefaultScalaModule.scala:36)
	at com.fasterxml.jackson.module.scala.DefaultScalaModule$.<clinit>(DefaultScalaModule.scala)

3.可以代码最前面增加如下代码解决:

System.setProperty("HADOOP_USER_NAME", "root")

此处的root改为你们自己的hadoop用户名称

Exception in thread "main" org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Unable to create database path file:/D:/SoftWare/idea-2019.2.3/wordspace/spark-warehouse/idea_test.db, failed to create database idea_test);

推荐阅读