Mysql學習(三)Spark(Scala)寫入Mysql的兩種方式
阿新 • • 發佈:2018-12-25
package total import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import java.util.Properties import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} object CountProvince { def main(args: Array[String]): Unit = { /** * 第一種寫入Mysql方式 */ /** * 第一步判斷引數個數 */ if (args.length < 2){ println( """ |total.CountProvince<inputFilePath><outputFilePath> |<inputFilePath> 輸入的檔案路徑 |<outputFilePath> 輸出的檔案路徑 """.stripMargin ) System.exit(0) } /** * 接收引數 */ val Array(inputFile,outputFile) = args val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}") // val sc = new SparkContext(conf) // val input = ("F:\\Scala\\Spark\\第二十天-dmp專案\\資料\\out1") /** * 建立SparkSession */ val session: SparkSession = SparkSession.builder().config(conf).getOrCreate() /** * 讀取檔案的資料 */ val df: DataFrame = session.sqlContext.read.parquet(inputFile) /** * 建立一個臨時表 */ df.createOrReplaceTempView("countProvince") /** * 統計出來count */ val sql = """ select count(*),provincename, cityname from countProvince group by provincename, cityname order by provincename""" val dff = session.sqlContext.sql(sql) val url = "jdbc:mysql://192.168.123.102:3306/test" val properties = new Properties() properties.put("user","root") properties.put("password","root") dff.write.jdbc(url,"count",properties) /** * 第二種寫入Mysql方式 */ // val conf = new SparkConf().setMaster("local[2]").setAppName("CountProvince") //// val sc = new SparkContext(conf) // val Spark: SparkSession = SparkSession.builder().config(conf).getOrCreate() // val sc = Spark.sparkContext // /** // * 讀取檔案 // */ // val inputPath = sc.textFile("F:\\Scala\\Spark\\第二十天-dmp專案\\資料\\data.txt") // inputPath.foreach(println(_)) // /** // * 計算count // */ // val countRDD = inputPath // .map(line => { // val fields = line.split(",") // (fields(24) + ":" + fields(25), 1) // }).reduceByKey(_ + _) // // /** // * 用Row來轉換 // */ // val RowRdd: RDD[Row] = countRDD.map(tuple => { // val diming = tuple._1 // val count = tuple._2 // Row(diming, count) // }) // /** // * 再用schema // */ // val schema: StructType = StructType( // StructField("diming", StringType, true) :: // StructField("count", LongType, true) :: Nil // ) // /** // * 把 Row和 schema 合併成為 DataFrame // */ // val df: DataFrame = Spark.createDataFrame(RowRdd,schema) // // /** // * 建立一個臨時表 // */ // df.createOrReplaceTempView("countProvince") // // /** // * 把結果持久化到資料庫 // */ // val url = "jdbc:mysql://192.168.123.102:3306/test" // val properties = new Properties() // properties.put("user","root") // properties.put("password","root") // df.write.jdbc(url,"count2",properties) } }