1. 程式人生 > >SparkSQL(6)——Spark SQL JDBC

SparkSQL(6)——Spark SQL JDBC

Spark SQL可以通過JDBC從關係型資料庫中讀取資料的方式建立DataFrame。 通過對DataFrame一系列的計算後,還可以將資料再寫回關係型資料庫中。

SparkSQL從MySQL中載入資料

package com.fgm.sparksql

import java.util.Properties

import org.apache.spark.sql.SparkSession

/**
  *通過sparksql讀取mysql表中的資料
  *
  * @Auther: fgm
  */
object DataFromMysql {
  def main(args: Array[String]): Unit = {
    //建立物件
    val spark = SparkSession.builder().appName("DataFromMysql").master("local[2]").getOrCreate()
    //通過sparkSession物件載入mysql中的資料
    val url="jdbc:mysql://localhost:3306/spark"
    //定義表名
    val table="test"
    //properties
    val properties=new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123")
    val jdbc = spark.read.jdbc(url,table,properties)

    jdbc.printSchema()
    jdbc.show()
    jdbc.createTempView("test")
    spark.sql("select * from test").show()
    spark.stop()
  }
}

SparkSQL向MySQL中寫入資料

package com.fgm.sparksql

import java.util.Properties
import org.apache.spark.sql.SparkSession

/**
  *通過sparksql把結果資料寫入到mysql表
  * @Auther: fgm
  */
case class User(val id:Int,val name:String,val age:Int)

object DataToMysql {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("DataToMysql").master("local[2]").getOrCreate()
    val sc = spark.sparkContext
    sc.setLogLevel("WARN")

    //讀取資料檔案
    val RDD1 = sc.textFile("D:\\tmp\\user.txt").map(_.split(" "))
    //將RDD與樣例類關聯
    val userRDD = RDD1.map(x=>User(x(0).toInt,x(1),x(2).toInt))
    //構建DataFrame
    import spark.implicits._
    val df = userRDD.toDF()
    df.printSchema()
    df.show()

    df.createTempView("user")
    val result = spark.sql("select * from user where age >30")
    //定義表名
    val table="user"
    //將結果寫入到mysql
    //定義資料庫url
    val url="jdbc:mysql://localhost:3306/spark"
    //properties
    val properties=new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123")

    result.write.mode("append").jdbc(url,table,properties)
  
    //再將資料庫中的資料讀取出來,檢查是否寫入成功,也可以進行其他相關操作
    //val jdbc=spark.read.jdbc(url,table,properties)
    //jdbc.show()
    
    spark.stop()
  }
}

D:\tmp\user.txt

1 zhangsan 20
2 lisi 29
3 wangwu 33
4 zhaoliu 30
5 hahaha 44

未註釋讀取的程式碼時,資料如下: 在這裡插入圖片描述

並且檢視資料庫發現,新建的user表中已經有了資料。 在這裡插入圖片描述

注意:以上程式碼,都可以打成jar包之後在叢集中執行。引數(如:檔案url,以及table等,)可以通過args(0)等方式傳入,不要寫死在程式碼裡。