首页 > 技术文章 > Spark与mysql数据库数据交互实例

jizhong 2020-04-03 11:08 原文

1.Spark可以读取多种数据源,本实例为读取mysql.

2.准备工作:

sacla,idea,mysql-connector-java,版本从:https://mvnrepository.com/获取

3.数据获取方式一:

object WordFreq {
  def main(args: Array[String]) {

    val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql")
      .config("spark.sql.shuffle.partitions", 1).getOrCreate()

    val properties: Properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "root")
    properties.setProperty("driver", "com.mysql.jdbc.Driver")

    //方式一
    val person: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/acc", "ttt", properties)
    person.show()
    //方式二
    spark.read.jdbc("jdbc:mysql://localhost:3306/acc", "(select * from ut_tt) T", properties).show()
  }
}

数据获取方式二:

        val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql")
          .config("spark.sql.shuffle.partitions", 1).getOrCreate()
        val map: Map[String, String] = Map[String, String](
          elems = "url" -> "jdbc:mysql://localhost:3306/yyyy",
          "driver" -> "com.mysql.jdbc.Driver",
          "user" -> "root",
          "password" -> "root",
          "dbtable" -> "notice")
        val score: DataFrame = spark.read.format("jdbc").options(map).load
        score.show()

数据获取方式三:

      val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql")
          .config("spark.sql.shuffle.partitions", 1).getOrCreate()
        val reader: DataFrameReader = spark.read.format("jdbc")
          .option("url", "jdbc:mysql://localhost:3306/yyyyyy")
          .option("driver", "com.mysql.jdbc.Driver")
          .option("user", "root")
          .option("password", "root")
          .option("dbtable", "notice")

        val source2: DataFrame = reader.load()

        source2.show()

将数据插入mysql数据库

 //将查询结果插入mysql表

    val spark: SparkSession = SparkSession.builder().master("local").appName("getDatafromMysql")
      .config("spark.sql.shuffle.partitions", 1).getOrCreate()

    val result  = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/ttttttt")  //*****这是数据库名
      .option("driver", "com.mysql.jdbc.Driver").option("dbtable", "notice")//*****是表名
      .option("user", "root").option("password", "root").load()


    val properties: Properties = new Properties()
    properties.setProperty("user", "root")
    properties.setProperty("password", "root")
    properties.setProperty("driver", "com.mysql.jdbc.Driver")
    properties.setProperty("characterEncoding","utf8")
    result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/iii", "notice_copy1", properties)

所需引用:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import java.util.Properties

在pom文件中,添加mysql-connector-java引用:

 <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>*****</version>
    </dependency>

4.运行结果:

 

推荐阅读