首页 > 技术文章 > spark的javaAPI,对Json读取和操作,视图创建和sql的操作

han-guang-xue 2018-11-29 11:02 原文

json格式

{"user_id":3,"user_name":"张三3","user_age":17,"user_balance":20.53}
{"user_id":4,"user_name":"张三4","user_age":15,"user_balance":20.53}
{"user_id":5,"user_name":"张三5","user_age":19,"user_balance":20.54}
{"user_id":6,"user_name":"张三6","user_age":15,"user_balance":20.55}

代码:  读取json格式的文件

import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

object Dome {
  /**
    * 单词统计
    */
  def fun2: Unit = {
    val conf = new SparkConf()
    conf.setAppName("wordCount")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val line = sc.textFile("E:\\words.txt")
    line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,true).foreach(println(_))
    sc.stop()
  }

  /**
    *从hdfs上读取,需要打成jar包
    */
  def fun3(args:Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WC")
    val sc = new SparkContext(conf)
    val line = sc.textFile(args(0))
    line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile(args(1))
    sc.stop()
  }

  /**
    * 对统计相邻单词出现的个数
    * A;B;C;D;E;F
    * C;A;D;C;E;S
    * C;E;F
    */
  def fun(): Unit = {
    val conf = new SparkConf()
    conf.setAppName("wordCount")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    val line = sc.textFile("E:\\words.txt")
    line.map(_.split(";")).flatMap(x=>{
      for(i<-0 until x.length-1) yield (x(i)+","+x(i+1),1)
    }).foreach(println(_))
    sc.stop()
  }

  /**
    * 读取JSON格式的内容
    *      1.创建SparkConf
    *        2.创建SparkContext
    *        3.得到SqlContext
    *        4.得到DataFrame
    *        5.显示DataFrame中的数据
    */
  def fun1(): Unit ={
    val conf = new SparkConf().setAppName("JSON").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val people = sqlContext.read.format("JSON").load("E:\\user.json")
    people.show(30,100)//num 读取前面多山行  truncat显示列宽
    sc.stop()
  }

  /**
    * 不过时方法,参考官网
    */
  def fun4(): Unit ={
    val conf = new SparkConf().setAppName("JSON").setMaster("local")
    val spark = SparkSession.builder().appName("JSONDome")
      .config(conf).getOrCreate()
    val people:DataFrame = spark.read.json("E:\\user.json")
    people.show(4,100)
    people.printSchema()  //显示表结构
    people.select("name").show()
    import spark.implicits._  //条件查询需要将数据进行隐式转换
    people.select($"name", $"age"+10).show()
    people.select($"name", $"age">15).show()
    people.select("name","age").filter( $"age">15).show()
    people.groupBy("age").count().show()  //返回DataFrame对象
    spark.stop()
  }
  /**
    * 生成临时表,使用SQL查询语句查询
    */
  def fun5(): Unit ={
    val conf = new SparkConf().setAppName("JSON").setMaster("local")
    val spark = SparkSession.builder().appName("JSONDome")
      .config(conf).getOrCreate()
    val people:DataFrame = spark.read.json("E:\\user.json")

    //createGlobalTempView是临时的视图
    people.createOrReplaceTempView("user")  //创建
    //sql返回的还是DataFrame
    spark.sql("select * from user").show()
    spark.sql("select id,name,age,balance yue from user where age>15").show() //起别名不能用中文
//    spark.sql("update set age = age+100 from user")   //不能对表中数据进行修改
    spark.sql("select * from user").show()

    people.createGlobalTempView("user1")  //创建的是临时表,如果表名与user相同,会覆盖之前一个表
    spark.sql("select * from global_temp.user1").show()
    spark.newSession().sql("select * from global_temp.user1").show()  //  跨会话框操作
    spark.sql("select id,name,age,balance yue from global_temp.user1 where age>15").show() //起别名不能用中文

    people.createTempView("user2")  //使用这个创建不会覆盖之前参 存在的表
    spark.sql("select * from user2").show()
    //spark.sql("alter table user2 drop COLUMN balance RESTRICT") //不能修改表结构
    spark.sql("select * from user2").show()
    spark.stop()
  }

  /**
    * 读取多个文件
    */
  def fun6(): Unit ={
    val conf = new SparkConf().setAppName("JSON").setMaster("local")
    val spark = SparkSession.builder().appName("JSONDome")
      .config(conf).getOrCreate()
    val people:DataFrame = spark.read.json("E:\\empAndDept/*.json")
    people.show(100)
  }

  def main(args: Array[String]): Unit = {
    fun6()
  }
}

 

 代码:  读取远程数据库中的表数据,并写入磁盘

package sql

import java.util.Properties

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SQLDome {
  /**
    * 读取数据库中的内容
    */
  def fun1(): Unit ={
    val conf = new SparkConf().setAppName("JSON").setMaster("local")
    val spark = SparkSession.builder().appName("JSONDome")
      .config(conf).getOrCreate()
    val url = "jdbc:mysql://192.168.188.130/mydatabase?user=root&password=root"
    val tableEmp = "emp"
    val tableDept = "dept"
    val pop = new Properties()
    pop.put("Driver","com.mysql.jdbc.Driver")
    val emp = spark.read.jdbc(url,tableEmp,pop).createOrReplaceTempView("emp")
    val dept = spark.read.jdbc(url,tableDept,pop).createTempView("dept")

    spark.sql("select * from emp").show(10)
    spark.sql("select * from dept").show()

    //连表和条件查询
    val p = spark.sql("select * from dept,emp where dept.dept_id=emp.d_id and dept.dept_id = 3")
    p.show()

    /**
      * overwrite 重写
      * append 追加
      * error 如果文件存在,报错(默认)
      * ignore 如果文件存在,不做操作
      */
    p.write.mode("overwrite").json("E:\\empAndDept")
    spark.sql("select * from dept").write.mode("overwrite").json("E:\\dept")
  }

  def main(args: Array[String]): Unit = {
    fun1()
  }
}

 

pom.xml依赖和插件的配置(idea)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com2</groupId>
    <artifactId>spark2</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.11</scala.version>
        <spark.version>2.1.1</spark.version>
        <hadoop.version>2.7.3</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.45</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>


</project>

 

推荐阅读