首页 > 技术文章 > sparksql系列(五) SparkSql异常处理,优化,及查看执行计划

wuxiaolong4 2019-10-20 22:58 原文

有了上面四篇文章,再加上一些异常处理、优化,开发基本就没什么问题了。下面我们开始:

一:SparkSql异常处理

1.将类转换为DF

实际开发过程中有很多需要将一个数字或者汇聚出来的数据转换为DF的需求

这时候可以将数字或者数据转换成一个类,将类转换为DF

    val data = scala.collection.mutable.MutableList[Data]()
    data.+=(Data("a","b"))
    import sparkSession.implicits._
    data.toDF().show(100)
2.读JSON文件异常处理
    val sparkSession= SparkSession.builder().master("local").getOrCreate()

    var df2 = sparkSession.emptyDataFrame
    try {
        df2 = sparkSession.read.json("/JAVA/data/")
    } catch {
        case e: Exception => {
          println("error info")
        }
    }
    df2.show(100)
3.读CSV文件异常处理
    val sparkSession= SparkSession.builder().master("local").getOrCreate()

    var df2 = sparkSession.emptyDataFrame
    try {
        df2 = sparkSession.read.option("sep""|").csv("/JAVA/data/")
            .toDF("name","sex")
    } catch {
        case e: Exception => {
            println("error info")
       }
    }
    df2.show(100)
4.读TEXT文件异常处理。
个人理解CSV和TEXT一样,直接csv即可。还有一个原因是TEXT需要手动的去切分字符串作为一个列,使用起来太不方便了。还不如直接使用CSV
5.写文件
    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    var df = sparkSession.emptyDataFrame
    df = sparkSession.read.option("sep""|").csv("/JAVA/data")
.       toDF("name","sex")
    df.write.mode(SaveMode.Overwrite).option("sep""|").csv("/JAVA/data1")

    SaveMode.Overwrite:覆盖式写文件,没有文件夹会创建文件夹
    SaveMode.Append:添加式写文件,没有文件夹会报错,建议使用SaveMode.Overwrite
6.数据异常填充
进行真正开发的时候,经常join导致有一些空值(NULL),有时候产品需要将空值转换为一些特殊处理值:
    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD = javasc.parallelize(Arrays.asList(
        "{'name':'','age':''}",
        "{'name':'sunliu','age':'19','vip':'true'}"));
    val namedf = sparkSession.read.json(nameRDD)

    namedf.na.fill(Map("name"->"zhangsan","age"->"18","vip"->"false")).show(100)//第一个数据不是空值,是空字符串

结果

agenamevip
    false
19 wangwu true
19 wangwu true

二:SparkSql优化

1.缓存

Spark中当一个Rdd多次使用的时候就需要进行缓存。缓存将大大的提高代码运行效率。

      val sparkSession= SparkSession.builder().master("local").getOrCreate()
      val javasc = new JavaSparkContext(sparkSession.sparkContext)

      val nameRDD = javasc.parallelize(Arrays.asList(
          "{'name':'','age':''}",
          "{'name':'sunliu','age':'19','vip':'true'}"));
      val namedf = sparkSession.read.json(nameRDD)
      namedf.persist();           namedf.cache();

个人建议使用MEMORY_AND_DISK_SER,因为内存还是比较珍贵的,磁盘虽然慢但是大。

尽量不要使用MEMORY_AND_DISK_SER_2,这种后面有一个_2的,因为这是备份两个,一般情况下是不需要备份两个的。备份多了浪费内存。

2.Join策略
Spark有三种join的策略:broadcast join、Shuffle Hash Join、BroadcastHashJoin
  • broadcastHash join(大表和极小表):

    当大表join小表的时候:将小表进行广播到各个节点。

    优点:不用进行数据shuffle,每个节点进行自己节点上数据的计算

    缺点:将一个表的数据全部加载到主节点,对主节点的压力较大。

    参数:广播的默认大小是10M可以适当将大小调整。 sparkSession.sql("set spark.sql.autoBroadcastJoinThreshold=134217728")

  • Shuffle Hash Join(大表和小表)

    两个表进行重新分区之后,进行两个分区的数据遍历。

    优点:分区之后数据更小了,就全部加载到内存遍历就行了

    缺点:相对于broadcastHash join来说还是有一次shuffle

  • SortMergeJoin(大表和小表)

    两个表进行重新分区之后,进行两个分区的数据遍历,个人感觉分区前和Shuffle Hash Join没什么区别。

    缺点:分区之后数据还不能全部加载到内存,需要进行排序。将相同key的加载到内存。

执行计划

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD = javasc.parallelize(Arrays.asList("{'name':'wangwu','age':'18','vip':'t'}"));
    val namedf = sparkSession.read.json(nameRDD)

    namedf.explain()//显示执行计划

上线提交命令示例

    spark-submit
    --class class
    --master yarn                                      
    --executor-memory 28g                     //单台机器1/4
    --driver-memory 1g                        //driver内存一般就够了
    --num-executors 4                         //执行个数
    --executor-cores 6                        //机器内存cpu之比
    --deploy-mode cluster                     //必须配置,默认是client模式
    --conf spark.default.parallelism=1600 \        //最终文件个数
    --conf spark.sql.shuffle.partitions=1600 \     //最终文件个数
    --conf spark.dynamicAllocation.enabled=false \  //动态内存关闭
    --conf spark.shuffle.compress=true \            //中间结果压缩
    --conf spark.shuffle.spill.compress=true \      //中间结果压缩
    --conf spark.speculation=true \                 //慢任务检测
    --conf spark.speculation.interval=30000 \       //慢任务检测
    --conf spark.speculation.quantile=0.8 \         //慢任务检测
    --conf spark.speculation.multiplier=1.5 \       //慢任务检测
    --conf spark.hadoop.mapreduce.input.fileinputformat.split.minsize=67108864 \
    --conf spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=67108864 \
    Jar.jar
      //数组函数使用代码必备

Apache中文文档
http://spark.apachecn.org/#/docs/7?id=spark-sql-dataframes-and-datasets-guide

有了上面四篇文章,再加上一些异常处理、优化,开发基本就没什么问题了。下面我们开始:

一:SparkSql异常处理

1.将类转换为DF

实际开发过程中有很多需要将一个数字或者汇聚出来的数据转换为DF的需求

这时候可以将数字或者数据转换成一个类,将类转换为DF

    val data = scala.collection.mutable.MutableList[Data]()
    data.+=(Data("a","b"))
    import sparkSession.implicits._
    data.toDF().show(100)
2.读JSON文件异常处理
    val sparkSession= SparkSession.builder().master("local").getOrCreate()

    var df2 = sparkSession.emptyDataFrame
    try {
        df2 = sparkSession.read.json("/JAVA/data/")
    } catch {
        case e: Exception => {
          println("error info")
        }
    }
    df2.show(100)
3.读CSV文件异常处理
    val sparkSession= SparkSession.builder().master("local").getOrCreate()

    var df2 = sparkSession.emptyDataFrame
    try {
        df2 = sparkSession.read.option("sep""|").csv("/JAVA/data/")
            .toDF("name","sex")
    } catch {
        case e: Exception => {
            println("error info")
       }
    }
    df2.show(100)
4.读TEXT文件异常处理。
个人理解CSV和TEXT一样,直接csv即可。还有一个原因是TEXT需要手动的去切分字符串作为一个列,使用起来太不方便了。还不如直接使用CSV
5.写文件
    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    var df = sparkSession.emptyDataFrame
    df = sparkSession.read.option("sep""|").csv("/JAVA/data")
.       toDF("name","sex")
    df.write.mode(SaveMode.Overwrite).option("sep""|").csv("/JAVA/data1")

    SaveMode.Overwrite:覆盖式写文件,没有文件夹会创建文件夹
    SaveMode.Append:添加式写文件,没有文件夹会报错,建议使用SaveMode.Overwrite
6.数据异常填充
进行真正开发的时候,经常join导致有一些空值(NULL),有时候产品需要将空值转换为一些特殊处理值:
    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD = javasc.parallelize(Arrays.asList(
        "{'name':'','age':''}",
        "{'name':'sunliu','age':'19','vip':'true'}"));
    val namedf = sparkSession.read.json(nameRDD)

    namedf.na.fill(Map("name"->"zhangsan","age"->"18","vip"->"false")).show(100)//第一个数据不是空值,是空字符串

结果

agenamevip
    false
19 wangwu true
19 wangwu true

二:SparkSql优化

1.缓存

Spark中当一个Rdd多次使用的时候就需要进行缓存。缓存将大大的提高代码运行效率。

      val sparkSession= SparkSession.builder().master("local").getOrCreate()
      val javasc = new JavaSparkContext(sparkSession.sparkContext)

      val nameRDD = javasc.parallelize(Arrays.asList(
          "{'name':'','age':''}",
          "{'name':'sunliu','age':'19','vip':'true'}"));
      val namedf = sparkSession.read.json(nameRDD)
      namedf.persist();           namedf.cache();

个人建议使用MEMORY_AND_DISK_SER,因为内存还是比较珍贵的,磁盘虽然慢但是大。

尽量不要使用MEMORY_AND_DISK_SER_2,这种后面有一个_2的,因为这是备份两个,一般情况下是不需要备份两个的。备份多了浪费内存。

2.Join策略
Spark有三种join的策略:broadcast join、Shuffle Hash Join、BroadcastHashJoin
  • broadcastHash join(大表和极小表):

    当大表join小表的时候:将小表进行广播到各个节点。

    优点:不用进行数据shuffle,每个节点进行自己节点上数据的计算

    缺点:将一个表的数据全部加载到主节点,对主节点的压力较大。

    参数:广播的默认大小是10M可以适当将大小调整。 sparkSession.sql("set spark.sql.autoBroadcastJoinThreshold=134217728")

  • Shuffle Hash Join(大表和小表)

    两个表进行重新分区之后,进行两个分区的数据遍历。

    优点:分区之后数据更小了,就全部加载到内存遍历就行了

    缺点:相对于broadcastHash join来说还是有一次shuffle

  • SortMergeJoin(大表和小表)

    两个表进行重新分区之后,进行两个分区的数据遍历,个人感觉分区前和Shuffle Hash Join没什么区别。

    缺点:分区之后数据还不能全部加载到内存,需要进行排序。将相同key的加载到内存。

执行计划

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val javasc = new JavaSparkContext(sparkSession.sparkContext)

    val nameRDD = javasc.parallelize(Arrays.asList("{'name':'wangwu','age':'18','vip':'t'}"));
    val namedf = sparkSession.read.json(nameRDD)

    namedf.explain()//显示执行计划

上线提交命令示例

    spark-submit
    --class class
    --master yarn                                      
    --executor-memory 28g                     //单台机器1/4
    --driver-memory 1g                        //driver内存一般就够了
    --num-executors 4                         //执行个数
    --executor-cores 6                        //机器内存cpu之比
    --deploy-mode cluster                     //必须配置,默认是client模式
    --conf spark.default.parallelism=1600 \        //最终文件个数
    --conf spark.sql.shuffle.partitions=1600 \     //最终文件个数
    --conf spark.dynamicAllocation.enabled=false \  //动态内存关闭
    --conf spark.shuffle.compress=true \            //中间结果压缩
    --conf spark.shuffle.spill.compress=true \      //中间结果压缩
    --conf spark.speculation=true \                 //慢任务检测
    --conf spark.speculation.interval=30000 \       //慢任务检测
    --conf spark.speculation.quantile=0.8 \         //慢任务检测
    --conf spark.speculation.multiplier=1.5 \       //慢任务检测
    --conf spark.hadoop.mapreduce.input.fileinputformat.split.minsize=67108864 \
    --conf spark.hadoop.mapreduce.input.fileinputformat.split.maxsize=67108864 \
    Jar.jar
      //数组函数使用代码必备

Apache中文文档
http://spark.apachecn.org/#/docs/7?id=spark-sql-dataframes-and-datasets-guide

推荐阅读