1. 程式人生 > >SparkSQL入門案例之四(SparkSQL2.x)

SparkSQL入門案例之四(SparkSQL2.x)

前幾個案例講的都是都是SparkSQL1.x的程式設計,所以這裡就講SparkSQL2.x的程式設計

直接上程式碼,這裡的程式碼是在前邊案例的基礎上的:

package cn.ysjh0014.SparkSql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object SparkSqlTest1 {

  def main(args: Array[String]): Unit = {

    //SparkSQL2.x的程式設計API(SparkSession)
    //SparkSession是SparkSQL2.x的入口
    val session: SparkSession = SparkSession.builder().appName("SqlTest1").master("local[4]").getOrCreate() //getOrCreate()是建立SparkSession的

    //建立RDD
    val lines: RDD[String] = session.sparkContext.textFile(args(0))

    //整理資料
    val RowRdd: RDD[Row] = lines.map(line => {
      val fields = line.split(",")
      val id = fields(0).toLong
      val name = fields(1)
      val age = fields(2).toInt
      val yz = fields(3).toDouble
      Row(id, name, age, yz)
    })

    //結果型別,其實就是表頭,用於描述DataFrame
    val sm: StructType = StructType(List(
      StructField("id", LongType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true),
      StructField("yz", DoubleType, true)
    ))

    //將RowRDD關聯到Schema
    val df: DataFrame = session.createDataFrame(RowRdd,sm)

    import session.implicits._
    val df1: Dataset[Row] = df.where($"yz">98).orderBy($"age" desc)

    df1.show()

    session.stop()


  }
}

可以清楚的看出,SparkSQL2.x是建立SparkSession,而1.x是建立SparkContext