首页 > 技术文章 > spark 系列之二 Dataframe的使用

suzhenxiang 2020-12-25 16:00 原文

本篇主要是Dataframe的相关使用

RDD在我们的使用过程,每一行存储的是一个RDD对象。

RDD中有很多算子,可以供我们使用。比如最简单的wordcount,我们只需要简单的三个算子就可以完成hadoop写若干行代码才能完成的事,开发效率大大提升。

我们上一讲提到的算子有map,reduceByKey,flatMap,groupByKey,mapValues,sortBy,sortByKey

但是把数据的一行作为一个对象(黑盒),总觉得不够灵活,于是又引进了DataFrame,这个跟python pandas 里面的DataFrame对象有点类似。

先说明RDD和DataFrame在构建上的区别

上代码:

    /**
     * 创建sparksession 对象
     */
    val sparkSession = SparkSession.builder()
                      .master("local")
                      .appName("wordCount")
                      .getOrCreate()

    /**
     * 引入隐式转换
     */
    import sparkSession.implicits._
    val inputFile =  "file:///D:/software_download/spark_text/word.txt"

    /**
     * 这种方式是把文件先读成DataFrame 然后转化成 Dataset,再转化成RDD,再使用RDD的算子
     */
    sparkSession.read.option("charset","UTF-8")
                      .text(inputFile)
                      .as[String]
                      .rdd
                      .flatMap(line => line.split(" "))
                      .map(word => (word, 1))
                      .reduceByKey((a, b) => a + b)
                      .foreach(println)

    /**
     * 这种方式是把文件直接读成RDD,然后使用RDD的算子。
     */
    sparkSession.sparkContext.textFile(inputFile)
                     .flatMap(line => line.split(" "))
                     .map(word => (word, 1))
                     .reduceByKey((a, b) => a + b)
                     .foreach(println)

上面代码中as[String]的操作是把dataframe 转换成了 dataset ,具体dataFrame和dataset有什么区别,可以先看下这篇文章。https://zhuanlan.zhihu.com/p/29830732

下面是DataFrame的一些常用操作。

/**
     * 这种方式是把文件直接读成RDD,然后使用RDD的算子。
     */
    sparkSession.sparkContext.textFile(inputFile)
                     .flatMap(line => line.split(" "))
                     .map(word => (word, 1))
                     .reduceByKey((a, b) => a + b)
                     .foreach(println)

    /**
     * 下面来读取一个csv文件,实践一些DataFrame的常规操作
     */
    //第一种方式
    val inputcsvFile="file:///D:/software_download/meta-nlp-competitor-car-model-mentioned-20201202.csv"
    val df = sparkSession.read.format("csv")
                      .option("header","true")
                      .option("charset","UTF-8")
                      .load(inputcsvFile)
    //第二种方式
    val df1 = sparkSession.read
                      .option("header","true")
                      .option("charset","UTF-8")
                      .csv(inputcsvFile)
    df.printSchema()

    // 像写sql一样读取 brandName,modelID,modelName,mainBrandName,haveOnSale 字段,并把字段brandName 改名为brandName_bn
    val dataFrame = df.select(df("brandName") as ("brandName_bn"), df("modelID"), df("modelName"), df("mainBrandName"), df("haveOnSale"))
    dataFrame.show(5)

    /**
     *把df注册成car_model表,使用sql语句查询,实现与上面相同的功能
     */
    df.createOrReplaceTempView("car_model")

    val sql_df = sparkSession.sql(
                                """select
                                  |brandName as brandName_bn,modelID,modelName,mainBrandName,haveOnSale
                                  |from car_model
                                  |""".stripMargin)
    sql_df.show(5)

    /**
     * dataframe 的一些常规操作
     */

    dataFrame.filter(df("brandName").contains("汽车") && df("modelID")>5000).show()
    dataFrame.filter(df("brandName").contains("汽车") and df("modelID")>5000).sort(df("modelID").desc).show()
    dataFrame.filter(df("brandName").contains("汽车")).filter(df("modelID")>5000).show()

    /**
     * 更改字段类型,由于读取csv的默认的类型都是String类型,但是有时候我们需要转换成其它类型进行操作
     * 比如我们把modelID类型转换成long型
     */

    dataFrame.printSchema()
    val dataFrame_change = dataFrame.withColumn("modelID", col("modelID").cast(LongType))
    dataFrame_change.printSchema()

上一篇的问题解答:

    val rdd = sparkSession.sparkContext.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
    /**
     * x._1 表示value中的第一个值,v._2 表示value中的第二个值
     */
    rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).foreach(println)

 下一讲:讲DataFrame与RDD之间的一些转化操作

推荐阅读