1. 程式人生 > >Mysql學習(三)Spark(Scala)寫入Mysql的兩種方式

Mysql學習(三)Spark(Scala)寫入Mysql的兩種方式

package total
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import java.util.Properties

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}

object CountProvince {
  def main(args: Array[String]): Unit = {

    /**
      * 第一種寫入Mysql方式
      */


    /**
      * 第一步判斷引數個數
       */
    if (args.length < 2){
      println(
        """
          |total.CountProvince<inputFilePath><outputFilePath>
          |<inputFilePath> 輸入的檔案路徑
          |<outputFilePath> 輸出的檔案路徑
        """.stripMargin
      )
      System.exit(0)
    }
    /**
      * 接收引數
      */
    val Array(inputFile,outputFile) = args

    val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}")
    //    val sc = new SparkContext(conf)
//    val input = ("F:\\Scala\\Spark\\第二十天-dmp專案\\資料\\out1")
    /**
      * 建立SparkSession
      */
    val session: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    /**
      * 讀取檔案的資料
      */
    val df: DataFrame = session.sqlContext.read.parquet(inputFile)

    /**
      * 建立一個臨時表
      */
    df.createOrReplaceTempView("countProvince")
    /**
      * 統計出來count
      */
    val  sql =
      """
          select
              count(*),provincename, cityname
          from
              countProvince
          group by
              provincename, cityname
          order by
              provincename"""
    val dff = session.sqlContext.sql(sql)
    val url = "jdbc:mysql://192.168.123.102:3306/test"
    val properties = new Properties()
    properties.put("user","root")
    properties.put("password","root")
    dff.write.jdbc(url,"count",properties)
    


    /**
      * 第二種寫入Mysql方式
      */
//    val conf = new SparkConf().setMaster("local[2]").setAppName("CountProvince")
////    val sc = new SparkContext(conf)
//    val Spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//    val sc = Spark.sparkContext
//    /**
//      * 讀取檔案
//      */
//    val inputPath =  sc.textFile("F:\\Scala\\Spark\\第二十天-dmp專案\\資料\\data.txt")
//    inputPath.foreach(println(_))
//    /**
//      * 計算count
//      */
//    val countRDD = inputPath
//      .map(line => {
//        val fields = line.split(",")
//        (fields(24) + ":" + fields(25), 1)
//      }).reduceByKey(_ + _)
//
//    /**
//      * 用Row來轉換
//      */
//    val RowRdd: RDD[Row] = countRDD.map(tuple => {
//      val diming = tuple._1
//      val count = tuple._2
//      Row(diming, count)
//    })
//    /**
//      * 再用schema
//      */
//    val schema: StructType = StructType(
//      StructField("diming", StringType, true) ::
//        StructField("count", LongType, true) :: Nil
//    )
//    /**
//      * 把 Row和 schema 合併成為 DataFrame
//      */
//    val df: DataFrame = Spark.createDataFrame(RowRdd,schema)
//
//    /**
//      * 建立一個臨時表
//      */
//    df.createOrReplaceTempView("countProvince")
//
//    /**
//      * 把結果持久化到資料庫
//      */
//    val url = "jdbc:mysql://192.168.123.102:3306/test"
//    val properties = new Properties()
//    properties.put("user","root")
//    properties.put("password","root")
//    df.write.jdbc(url,"count2",properties)

  }
}