首页 > 技术文章 > Spark算子

zyl-study 2021-11-09 23:58 原文

算子分类

转换算子是将一个RDD变成另一个RDD之间的转换,懒执行,需要操作算子触发执行
操作算子不能将一个RDD变成另一个RDD,每一操作算子都会触发一个job
可以通过算子的返回值去判断 该算子是转换/操作算子

转换(Transformations)算子

Map

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapDemo1 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MapDemo1")
    val sc: SparkContext = new SparkContext(conf)

    /**
      * map:传入一条数据 返回一条数据
      * 不会改变数据的规模
      * 接收一个函数f: 参数同RDD中数据的类型,返回值类型由自己决定
      * 将RDD中的每一条数据 依次传给函数f
      */


    // 可以通过parallelize方法构建List的RDD
    val listRDD: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5))
    val RDD2: RDD[Double] = listRDD.map(s => {
      println("map" + s)
      (s * 2).toDouble
    })

    // foreach 也是一个操作算子 可以触发任务
    RDD2.foreach(println)


    List(1, 2, 3, 4, 5).map(s => {
      println("list" + s)
      s * 2
    })

    //方便观察控制台出现网站的job 写一个死循环一直保持运行
    while (true){

    }
  }

}

FlatMap

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

object FlatMapDemo2 {
  /**
    * flatMap :传入一条返回N条
    * 需要接收一个函数f,会将RDD中的每一条数据传给函数f,函数f处理完后需要返回一个集合或者数组
    * flatMap会自动将结果进行扁平化处理(展开)
    */

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("FlatMapDemo2")
    val sc: SparkContext = new SparkContext(conf)

    val listRDD: RDD[String] = sc.parallelize(List("java,spark,hive","hadoop,hive"))
    listRDD.flatMap(s=>{
      val splits: mutable.ArrayOps[String] = s.split(",")
      splits
    }).foreach(println)


    //省略写
    listRDD.flatMap(s=>s.split(","))
      .foreach(println)

    //map来写
    listRDD.map(s=>{
      val splits: Array[String] = s.split(",")
      splits
      //直接prt,只能是输出两个数组地址
    }).foreach(s=> println(s.mkString("-")))


  }

}

MapPartitions

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapPartitionsDemo3 {

   /**Map、MapPartitions
   foreach、foreachPartitions
   有时在处理数据的时候需要和外部系统建立连接
   如果连接建立在Driver端 连接是不能被序列化的 及在算子内部无法使用算子外部创建的连接
   那么可以使用MapPartitions、foreachPartitions算子降低连接的创建与销毁次数,提高效率
     */
  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MapPartitionsDemo3")
    val sc: SparkContext = new SparkContext(conf)

    // 读取学生数据构建RDD
    val stuRDD: RDD[String] = sc.textFile("scala/data/students.txt",6)
    // 获取分区数量
    println(stuRDD.getNumPartitions)

    // take 也是一个action算子 会返回一个Array
    // 这里的foreach实际上是Array的方法 不是RDD的算子
    stuRDD.take(10).foreach(println)



    // mapPartitions也是一个转换算子
    stuRDD.mapPartitions(part=>{
      println("mappartitions")
      //一共输出6个mappartitions,因为设置了6个分区
      part.map(s=>s.split(",")(1)).take(10)
    }).foreach(println)


    // mapPartitionsWithIndex也是一个转换算子
    stuRDD.mapPartitionsWithIndex((index,part)=>{
      println("现在的分区是:"+index)
      part.map(s=>s.split(",")(1)).take(10)
    }).foreach(println)



    // foreachPartition也是一个操作算子,无返回值
    // 每一个分区由一个task去处理数据
    // 相当于最终每个分区会创建一次conn
    stuRDD.foreachPartition(part=>{
//      在里面创建JDBC连接,因为有6个分区,所以只有六次执行

      Class.forName("com.mysql.jdbc.Driver")
      val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/student?useSSL=false","root","123456")
      val statement: PreparedStatement = conn.prepareStatement("insert into student values(?,?,?,?,?)")
      println("分区")

      // // 按分区去处理数据
      part.foreach(s=>{
        val splits: Array[String] = s.split(",")
        val id: Int = splits(0).toInt
        val name: String = splits(1)
        val age: Int = splits(2).toInt
        val gender: String = splits(3)
        val clazz: String = splits(4)

        //将遍历的每一个数据变成每一个 ?
        statement.setInt(1,id)
        statement.setString(2,name)
        statement.setInt(3,age)
        statement.setString(4,gender)
        statement.setString(5,clazz)

        //一次五个加入到addBatch中处理
        statement.addBatch()

      })

      //这里不是execute,因为处理上面的Batch,所以用executeBatch
      statement.executeBatch()
      statement.closeOnCompletion()
      conn.close()

    })

  }

}

Filter

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FilterDemo4 {

  /**
    * filter算子:主要用于过滤数据
    * 接收一个函数f,函数f需要返回一个布尔值,为true则保留,false则过滤
    */
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("FilterDemo4").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    //过滤奇数
    val listRDD: RDD[Int] = sc.parallelize(List(1,2,3,4,5,6,7,8))
    listRDD.filter(s=>{
      s%2==1
    }).foreach(println)

  }
}

Sample

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SampleDemo5 {

  /**
    * sample:对数据取样
    * withReplacement 有无放回
    * fraction 抽样比例
    */

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("SampleDemo5").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    
    val stuRDD: RDD[String] = sc.textFile("scala/data/students.txt")
    stuRDD.sample(false,0.1)
      .foreach(println)
  }

}

Union

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object UnionDemo6 {

  /**
    * union : 将两个RDD首尾相连变成一个RDD
    * 两个RDD的结构必须一样
    */
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("UnionDemo6").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val listRDD1: RDD[Int] = sc.parallelize(List(1,2,3,4,5))
    println(listRDD1.getNumPartitions)  // 1
    val listRDD2: RDD[Int] = sc.parallelize(List(1,2,3))
    println(listRDD2.getNumPartitions)  // 1

    val resRDD: RDD[Int] = listRDD1.union(listRDD2)
    println(resRDD.getNumPartitions)    // 2
    resRDD.foreach(println)
  }

}

Join

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object JoinDemo7 {
  def main(args: Array[String]): Unit = {
    /**
      * join 同MySQL中的join操作类似
      * 将两个k-v格式的RDD,按照相同的key做类似与inner join操作
      */


    val conf: SparkConf = new SparkConf().setAppName("JoinDemo7").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)


    // 构建K-V格式的RDD
    val tuple2RDD1: RDD[(String, String)] = sc.parallelize(List(("001", "张三"), "002" -> "小红", "003" -> "小明"))
    val tuple2RDD2: RDD[(String, Int)] = sc.parallelize(List(("001", 20), "002" -> 22, "003" -> 21))
    val tuple2RDD3: RDD[(String, String)] = sc.parallelize(List(("001", "男"), "002" -> "女"))

    val joinRDD: RDD[(String, (String, Int))] = tuple2RDD1.join(tuple2RDD2)

    joinRDD.map(kv=>{
      val id: String = kv._1
      val name: String = kv._2._1
      val age: Int = kv._2._2
      id+","+name+","+age
    }).foreach(println)




    // 使用match接收RDD中的每一条数据,用的是map{}
    joinRDD.map {
    case (id: String, (name: String, age: Int))
    =>id+"|"+name+"|"+age
  }.foreach(println)


    tuple2RDD2.leftOuterJoin(tuple2RDD3)
      .map{
        // 关联上的处理逻辑
        case (id:String,(age:Int,Some(gender)))
        =>id+","+age+"|"+gender
        // 未关联上的处理逻辑
        case (id:String,(age:Int,None))
          =>id+","+age+","+"-"
      }.foreach(println)
  }
}

Groupby

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object GroupbyDemo8 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("GroupbyDemo8").setMaster("local")
    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("scala/data/students.txt")

    //得到班级人数
    stuRDD.groupBy(s=>s.split(",")(4))
      .map(kv=>{
        val clazz: String = kv._1
        val value: Iterable[String] = kv._2
        val size: Int = value.size
        clazz+","+size
      }).foreach(println)


    /**
      * groupByKey 作用K-V格式的RDD上,默认按K分组
      */
    stuRDD.map(s=>(s.split(",")(4),s))
      .groupByKey()
      .map(kv=>{
        kv._1+","+kv._2.size
      }).foreach(println)



    /**
      * reduceByKey 需要接收一个聚合函数
      * 首先会对数据按key分组 然后在组内进行聚合(一般是加和,也可以是Max、Min之类的操作)
      * 相当于 MR 中的combiner
      * 可以在Map端进行预聚合,减少shuffle过程需要传输的数据量,以此提高效率
      * 相对于groupByKey来说,效率更高,但功能更弱
      * 幂等操作
      * y = f(x) = f(y) = f(f(x))
      *
      */


    //得到班级人数另一种算法
    stuRDD.map(s=>(s.split(",")(4),1))
      .groupBy(_._1)
      .map(kv=>{
        val clazz: String = kv._1
        val sum: Int = kv._2.map(_._2).sum
        clazz+","+sum
      }).foreach(println)



    //使用reduceByKey
    stuRDD.map(s=>(s.split(",")(4),1))
      .reduceByKey((x,y)=>x+y)
      .foreach(println)

    //简写
    stuRDD.map(s=>(s.split(",")(4),1))
      .reduceByKey(_+_)
      .foreach(println)
  }
}

sortBY

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SortDemo9 {

  /**
    * sortBy 转换算子
    * 指定按什么排序 默认升序
    *
    * sortByKey 转换算子
    * 需要作用在KV格式的RDD上,直接按key排序 默认升序
    */

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("SortDemo9")
    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("scala/data/students.txt")

    // ascending = false 降序排列
    stuRDD.sortBy(s=>s.split(",")(2),false)
      .foreach(println)

    stuRDD.map(s=>(s.split(",")(2),s))
      .sortByKey()
      .foreach(println)

  }

}

mapValues

import org.apache.spark.{SparkConf, SparkContext}

object MapValuesDemo10 {

  /**
    * mapValues 转换算子
    * 需要作用在K—V格式的RDD上
    * 传入一个函数f
    * 将RDD的每一条数据的value传给函数f,key保持不变
    * 数据规模也不会改变
    */

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("MapValuesDemo10")
    val sc: SparkContext = new SparkContext(conf)


    //对kv格式中的v进行操作
    sc.parallelize(List(("张三", 1), ("李四", 2), ("王五", 3)))
      .mapValues(value=>value*20)
      .foreach(println)
  }

}

操作(Action)算子

每个action算子触发一个job

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ActionDemo11 {
  def main(args: Array[String]): Unit = {
    /**
      * 操作算子(行为算子) :每一个action算子都会触发一个job
      * foreach、count、take、collect、reduce、saveAsTextFile
      */

    val conf: SparkConf = new SparkConf().setMaster("local").setAppName("ActionDemo11")
    val sc: SparkContext = new SparkContext(conf)

    val stuRDD: RDD[String] = sc.textFile("scala/data/students.txt")

    // foreach
    stuRDD.foreach(println)

    // take 取出前n条数据 相当于limit
    stuRDD.take(10).foreach(println)

    // count
    // 返回RDD的数据量的多少
    println(stuRDD.count())

    // collect
    // 将RDD转换为Scala中的Array
    // 注意数据量的大小 容易OOM
    val stuARR: Array[String] = stuRDD.collect()
    stuARR.take(10).foreach(println)

    // reduce 全局聚合
    // 相当于select sum(age) from student group by 1
    val i: Int = stuRDD.map(s => s.split(",")(2).toInt)
      .reduce(_ + _)
    println(i)

    // saveAsTextFile用于保存
   
  }

}

推荐阅读