首页 > 技术文章 > SPark SQL 从 DB 读取数据方法和方式 scala

TendToBigData 2017-01-17 22:06 原文

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import java.util.HashMap
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.DataTypes
import java.util.ArrayList
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.DataFrame
import com.mysql.jdbc.Connection
import com.mysql.jdbc.Statement
import java.sql.DriverManager

/**
 * @author three
 */
object JDBCDataSource{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("JDBCDataSource").setMaster("local[1]")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    var options = new HashMap[String, String]();
    options.put("url", "jdbc:mysql://192.168.5.111:3306/testdb");
    options.put("user", "spark");
    options.put("password", "spark2016");

    options.put("dbtable", "student_info");
    var studentInfosDF = sqlContext.read.format("jdbc").options(options).load()

    options.put("dbtable", "student_score");
    var studentScoresDF = sqlContext.read.format("jdbc").options(options).load()

    // 将两个DataFrame转换成JavePairRDD,进行join操作  //需要有相同的key 做join
    var rdd1 = studentInfosDF.map(x => (x.getString(0), x.getInt(1)))
    var rdd2 = studentScoresDF.map(x => (x.getString(0), x.getInt(1)))
    var studentsRDD = rdd1.join(rdd2)

    // 将JavaPairRDD转换为JavaRDD<Row>
    var studentRowsRDD = studentsRDD.map(x => (x._1, x._2._1.toInt, x._2._2.toInt))
    var filteredStudentRowsRDD = studentRowsRDD.filter(_._3 > 80).map(x => (Row(x._1, x._2.toInt, x._3.toInt)))

    // 继续转换为DataFrame
    var structFields = new ArrayList[StructField]();
    structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
    structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
    structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));

    var structType = DataTypes.createStructType(structFields);
    var studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);

    var rows = studentsDF.collect();
    for (row <- rows) {
      println(row);
    }

    // 将DataFrame数据保存到MySQL表中
    studentsDF.foreach { row =>
      {
        var sql = "insert into good_student_info values(".+("'").+(row.getString(0)).+("',").+(row.getInt(1)).+(",").+(row.getInt(2)).+(")")
        //println(sql)
        Class.forName("com.mysql.jdbc.Driver");
        var conn = DriverManager.getConnection("jdbc:mysql://192.168.5.111:3306/testdb", "spark", "spark2016");
        var stat = conn.createStatement();
        stat.executeUpdate(sql);

        if (stat != null) {
          stat.close();
        }
        if (conn != null) {
          conn.close();
        }
      }
    }
  }
}

推荐阅读