Spark-SQL 讀寫jdbc
阿新 • • 發佈:2021-01-05
-
讀jdbc中的資訊
import java.util.Properties import org.apache.spark.sql.{DataFrame, SparkSession} object CreateDataFrameFromJDBC { def main(args: Array[String]): Unit = { //建立SparkSession val spark = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") //一定要有schema資訊 //在執行時呼叫read.jdbc方法一定要或取資料庫表的資訊 //read.jdbc是在Driver端跟資料庫查詢,返回指定表的schema資訊作為DataFrame的資訊 val df: DataFrame = spark.read.jdbc("jdbc:mysql://localhost:3306/demo01?characterEncoding=utf8", "tb_user", properties) df.printSchema() df.show() spark.stop() } }
-
寫入到mysql中
import java.util.Properties import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession} object WriteJDBC { def main(args: Array[String]): Unit = { //建立SparkSession val spark = SparkSession.builder() .appName(this.getClass.getSimpleName) .master("local[*]") .getOrCreate() val sc = spark.sparkContext val lines: RDD[String] = sc.textFile("src/main/scala/data/user.txt") //row的欄位沒有名字 沒有型別 val rdd1: RDD[Row] = lines.map(e => { val split = e.split(",") Row(split(0), split(1).toInt, split(2).toDouble) }) //關聯schema(欄位名稱、欄位型別、是否可以為空) val schema: StructType = StructType( Array( StructField("name", StringType), StructField("age", IntegerType), StructField("fv", DoubleType) ) ) //將RowRDD與StructType中的schema關聯 val df: DataFrame = spark.createDataFrame(rdd1, schema) val properties = new Properties() properties.setProperty("user","root") properties.setProperty("password","123456") //ErrorIfExists如果輸出的目錄或表已經存在就報錯 //Append 追加 //Overwrite 將原來的目錄或資料庫表刪除,然後在新建目錄或表將資料寫入 //Ignore 沒有就寫入,有就什麼都不做 df.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/demo01?characterEncoding=utf8","tb_user",properties) sc.stop() spark.stop() } }