1. 程式人生 > >Spark-sql 1.x版

Spark-sql 1.x版

package Test01

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, SQLContext, SparkSession}

/**
  * 如果使用DataFrame或SQL讀取資料,先將非結構化資料轉化成結構化資料
  * 然後註冊檢視,執行Sql,最好觸發action
  */
case class Emp(empno:Int,ename:String,job:String,mgr:String,hiredate:String,sal:Int,comm:String,deptno:Int)

object SparkSql01 {
  def main(args: Array[String]): Unit = {
      val conf = new SparkConf().setAppName("SparkSql01").setMaster("local[*]")
      val sc = new SparkContext(conf)
//sqlContext是對sparkContext的一個包裝(增強了類功能,可以處理結構化的資料)
    val sqlContext = new SQLContext(sc)
    //讀取資料來源
   val lines: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(","))
    //建立對映關係
    val allEmp = lines.map(x=>Emp(x(0).toInt,x(1),x(2),x(3),x(4),x(5).toInt,x(6),x(7).toInt))

    //匯入隱式轉換,將RDD轉換成DataFrame
    import sqlContext.implicits._
    val df1 = allEmp.toDF()
    df1.show()

    //釋放資源
    sc.stop()

  }
}

 

二種寫法

package Test01

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

object SQLDemox2 {
  def main(args: Array[String]): Unit = {
    //Spark Rdd 程式的執行入口
    val conf = new SparkConf().setAppName("SQLDemox2").setMaster("local[*]")
    val sc = new SparkContext(conf)

    //SqlContext是對sparkconext的一個包裝(增強了類功能,可以處理結構化資料)
    val sqlcontext = new SQLContext(sc)
    //讀取資料來源
    val lines: RDD[Array[String]] = sc.textFile(args(0)).map(_.split(","))

    //整理資料Row + schema
    val rowRDD: RDD[Row] = lines.map(line => {
      Row(line(0).toInt, line(1), line(2), line(3), line(4), line(5).toInt, line(6), line(7).toInt)
    })

    //建立表結構
    val schema = StructType {
      List(
        StructField("empno", IntegerType),
        StructField("ename", StringType),
        StructField("job", StringType),
        StructField("mgr", StringType),
        StructField("hiredate", StringType),
        StructField("sal", IntegerType),
        StructField("comm", StringType),
        StructField("deptno", IntegerType)
      )
    }
    //RDD關聯Schema
    val df: DataFrame = sqlcontext.createDataFrame(rowRDD,schema)

    val result = df.select("empno","ename","sal")

    result.show()
    //釋放資源
    sc.stop()
  }
}