首页 > 技术文章 > sparksql系列(四) sparksql 操作数据库

wuxiaolong4 2019-10-20 13:23 原文

一:SparkSql操作mysql

  • 老规矩:先抽出来公共的方法:
    import java.util.Arrays

    import org.apache.spark.SparkConf
    import org.apache.spark.api.java.JavaSparkContext
    import org.apache.spark.sql.{DataFrame, Row, SparkSession, functions}
    import org.apache.spark.sql.functions.{col, desc, length, row_number, trim, when}
    import org.apache.spark.sql.functions.{countDistinct,sum,count,avg}
    import org.apache.spark.sql.functions.concat
    import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.sql.SaveMode
    import java.util.ArrayList


    object WordCount {

        def dataAndJdbcoption() = {  

            val sparkSession= SparkSession.builder().master("local").getOrCreate()
            val javasc = new JavaSparkContext(sparkSession.sparkContext)

            val nameRDD1 = javasc.parallelize(Arrays.asList("{'id':'7'}","{'id':'8'}","{'id':'9'}"));
            val nameRDD1df = sparkSession.read.json(nameRDD1)

            val prop = new java.util.Properties
            prop.setProperty("user","root")
            prop.setProperty("password","123456")
            prop.setProperty("driver","com.mysql.jdbc.Driver")
            prop.setProperty("dbtable","blog")
            prop.setProperty("url","jdbc:mysql://127.0.0.1:3306/test")

            (nameRDD1df,prop)

        }

    }
1.读mysql
    val df = dataAndJdbcoption()._1
    val prop = dataAndJdbcoption()._2

    val sparkSession= SparkSession.builder().master("local").getOrCreate()
    val data = sparkSession.read.format("jdbc").option("user","root").option("password","123456")
        .option("driver","com.mysql.jdbc.Driver")
        .option("url","jdbc:mysql://127.0.0.1:3306/test").option("dbtable""blog")
        .load()
    data.show(100)
2.写mysql
  val df = dataAndJdbcoption()._1
  val prop = dataAndJdbcoption()._2
  df.repartation(1).write.mode(SaveMode.Append).jdbc(prop.getProperty("url"), prop.getProperty("dbtable"), prop)

二:SparkSql操作Hive

1.公司读Hive数据
  • 其实是读Hive表的location的文件,生成最终的文件。
2.公司写Hive数据
  • 生成文件后将数据load进Hive
3.直接使用Sql操作Hive的数据
    val conf = new SparkConf().setAppName("WordCount")
    //合并小文件,sparksql默认有200个task执行文件,会生成很多小文件。其实有很多参数可以优化详见sparkSession.sql("SET -v")

    conf.set("mapreduce.input.fileinputformat.split.minsize","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.maxsize","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.minsize.per.node","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.maxsize.per.node","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.minsize.per.rack","1024000000")
    conf.set("mapreduce.input.fileinputformat.split.maxsize.per.rack","1024000000")
    val sparkSession= SparkSession.builder().enableHiveSupport().config(conf).getOrCreate()

    sparkSession.sql("insert into table table1 select aa/*+ REPARTITION(3) */ from sparksqlTempTable")

除了上述方法可以合并文件之外,还有一种方法可以合并文件:

    val dataFrame = sparkSession.sql("select aa from table ").coalesce(3);//日志看task数量3
    dataFrame.createOrReplaceTempView("
sparksqlTempTable")
    sparkSession.sql("
insert into table table1 select aa from sparksqlTempTable")

推荐阅读