1. 程式人生 > >spark sql jdbc資料來源 多種輸出方式

spark sql jdbc資料來源 多種輸出方式

package com.ws.jdbc
import java.util.Properties
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
  * spark sql jdbc資料來源
  */
object JdbcDataSource {

  def main(args: Array[String]): Unit = {
    val sparkSession = SparkSession.builder().appName("JdbcDataSource").master("local[*]").getOrCreate()

    //連線資料庫,讀取表結構,load也是Transformation
    val db: DataFrame = sparkSession.read.format("jdbc").options(
      Map("url" -> "jdbc:mysql://hadoop-03:3306/ip?charatorEncoding=utf-8",
        "driver" -> "com.mysql.jdbc.Driver",
        "dbtable" -> "ipcount",
        "user" -> "root",
        "password" -> "root")).load()

    //sql方式
    //db.createTempView("t_ipcount")
    //val result = sparkSession.sql("select * from t_ipcount where id > 15")

    //dataFrame 方式
    val result = db.filter(r => {
      r.getAs[Int]("id") > 15
    })

    import sparkSession.implicits._
    //lambda表示式
    //val result = db.filter($"id" >= 15)

    //將結果寫入mysql
    val p = new Properties()
    p.put("user", "root")
    p.put("password", "root")
    // ignore : 如果表存在,不做任何操作;表不存在,建立表並將資料寫進去
    result.write.mode("append").jdbc("jdbc:mysql://hadoop-03:3306/ip?charatorEncoding=utf-8", "ipcount2", p)

    //json輸出目錄
    result.write.json("E:\\bigData\\testdata\\ip3")

    //csv格式輸出目錄
    result.write.csv("E:\\bigData\\testdata\\ip2")

    //儲存到檔案只能儲存一列,多個欄位報錯,且欄位型別必須為string
    result.write.text("E:\\bigData\\testdata\\ip4")

    //也可以指定hdfs路徑
    result.write.parquet("E:\\bigData\\testdata\\ip4")

    //result.show()
    sparkSession.stop()
  }
}