spark sql jdbc資料來源 多種輸出方式
阿新 • • 發佈:2018-11-10
package com.ws.jdbc import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} /** * spark sql jdbc資料來源 */ object JdbcDataSource { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().appName("JdbcDataSource").master("local[*]").getOrCreate() //連線資料庫,讀取表結構,load也是Transformation val db: DataFrame = sparkSession.read.format("jdbc").options( Map("url" -> "jdbc:mysql://hadoop-03:3306/ip?charatorEncoding=utf-8", "driver" -> "com.mysql.jdbc.Driver", "dbtable" -> "ipcount", "user" -> "root", "password" -> "root")).load() //sql方式 //db.createTempView("t_ipcount") //val result = sparkSession.sql("select * from t_ipcount where id > 15") //dataFrame 方式 val result = db.filter(r => { r.getAs[Int]("id") > 15 }) import sparkSession.implicits._ //lambda表示式 //val result = db.filter($"id" >= 15) //將結果寫入mysql val p = new Properties() p.put("user", "root") p.put("password", "root") // ignore : 如果表存在,不做任何操作;表不存在,建立表並將資料寫進去 result.write.mode("append").jdbc("jdbc:mysql://hadoop-03:3306/ip?charatorEncoding=utf-8", "ipcount2", p) //json輸出目錄 result.write.json("E:\\bigData\\testdata\\ip3") //csv格式輸出目錄 result.write.csv("E:\\bigData\\testdata\\ip2") //儲存到檔案只能儲存一列,多個欄位報錯,且欄位型別必須為string result.write.text("E:\\bigData\\testdata\\ip4") //也可以指定hdfs路徑 result.write.parquet("E:\\bigData\\testdata\\ip4") //result.show() sparkSession.stop() } }