1. 程式人生 > 其它 >Spark-SQL 讀寫jdbc

Spark-SQL 讀寫jdbc

技術標籤:Sparkspark

  • 讀jdbc中的資訊

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SparkSession}

object CreateDataFrameFromJDBC {
  def main(args: Array[String]): Unit = {
    //建立SparkSession
    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()

    val properties = new Properties()
    properties.setProperty("user","root")
    properties.setProperty("password","123456")

    //一定要有schema資訊
    //在執行時呼叫read.jdbc方法一定要或取資料庫表的資訊
    //read.jdbc是在Driver端跟資料庫查詢,返回指定表的schema資訊作為DataFrame的資訊
    val df: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/demo01?characterEncoding=utf8", "tb_user", properties)

    df.printSchema()
    df.show()
    spark.stop()
  }
}
  • 寫入到mysql中

import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}

object WriteJDBC {
  def main(args: Array[String]): Unit = {

    //建立SparkSession
    val spark = SparkSession.builder()
      .appName(this.getClass.getSimpleName)
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext

    val lines: RDD[String] = sc.textFile("src/main/scala/data/user.txt")
    //row的欄位沒有名字 沒有型別
    val rdd1: RDD[Row] = lines.map(e => {
      val split = e.split(",")
      Row(split(0), split(1).toInt, split(2).toDouble)
    })
    //關聯schema(欄位名稱、欄位型別、是否可以為空)
    val schema: StructType = StructType(
      Array(
        StructField("name", StringType),
        StructField("age", IntegerType),
        StructField("fv", DoubleType)
      )
    )
    //將RowRDD與StructType中的schema關聯
    val df: DataFrame = spark.createDataFrame(rdd1, schema)

    val properties = new Properties()

    properties.setProperty("user","root")
    properties.setProperty("password","123456")

    //ErrorIfExists如果輸出的目錄或表已經存在就報錯
    //Append 追加
    //Overwrite 將原來的目錄或資料庫表刪除,然後在新建目錄或表將資料寫入
    //Ignore 沒有就寫入,有就什麼都不做
    df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/demo01?characterEncoding=utf8","tb_user",properties)

    sc.stop()
    spark.stop()

  }
}