1. 程式人生 > >spark-sql的概述以及程式設計模型的介紹

spark-sql的概述以及程式設計模型的介紹

1、spark sql的概述

(1)spark sql的介紹:

  Spark SQL 是 Spark 用來處理結構化資料(結構化資料可以來自外部結構化資料來源也可以通 過 RDD 獲取)的一個模組,它提供了一個程式設計抽象叫做 DataFrame 並且作為分散式 SQL 查 詢引擎的作用。
  外部的結構化資料來源包括 JSON、Parquet(預設)、RMDBS、Hive 等。當前 Spark SQL 使用 Catalyst 優化器來對 SQL 進行優化,從而得到更加高效的執行方案。並且可以將結果儲存到外部系統。

(2)spark sql的特點:

   - 容易整合
   - 統一的資料訪問方式
   - 相容hive
   - 標準的資料連線

(3)關於spark sql的版本迭代:

   - spark sql 的前身是shark。但是spark sql拋棄了原有shark的程式碼,汲取了shark的一些優點,如:列儲存(In-Memory Columnar Storage)、Hive 相容性等,重新開發 SparkSQL。
   - spark -1.1 2014 年 9 月 11 日,釋出 Spark1.1.0。Spark 從 1.0 開始引入 SparkSQL(Shark 不再支援升級與維護)。Spark1.1.0 變化較大是 SparkSQL 和 MLlib
   - spark -1.3 增加了dataframe新
   - spark -1.4 增加了視窗分析函式
   - spark - 1.5 鎢絲計劃。Hive 中有 UDF 與 UDAF,Spark 中對 UDF 支援較早
   - spark 1.6 執行的 sql 中可以增加"--"註釋,Spark-1.5/1.6 的新特性,引入 DataSet 的概念
   - spark 2.x SparkSQL+DataFrame+DataSet(正式版本),Structured Streaming(DataSet),引入 SparkSession 統一了 RDD,DataFrame,DataSet 的程式設計入口

2、spark sql的程式設計模型

(1)sparkSession的介紹:

  SparkSession 是 Spark-2.0 引如的新概念。SparkSession 為使用者提供了統一的切入點,來讓使用者學習 Spark 的各項功能。
  隨著 DataSet 和 DataFrame 的 API 逐漸成為標準的 API,SparkSession 作為 DataSet 和 DataFrame API 的切入點,SparkSession 封裝了 SparkConf、SparkContext 和 SQLContext。為了向後相容,SQLContext 和 HiveContext 也被儲存下來。
  特點:


   - 為使用者提供一個統一的切入點使用 Spark 各項功能
   - 允許使用者通過它呼叫 DataFrame 和 Dataset 相關 API 來編寫程式
   - 減少了使用者需要了解的一些概念,可以很容易的與 Spark 進行互動
   - 與 Spark 互動之時不需要顯示的建立 SparkConf、SparkContext 以及 SQlContext,這些對 象已經封閉在 SparkSession 中
   - SparkSession 提供對 Hive 特徵的內部支援:用 HiveQL 寫 SQL 語句,訪問 Hive UDFs,從 Hive 表中讀取資料
   SparkSession的建立
  在spark-shell中SparkSession 會被自動初始化一個物件叫做 spark,為了向後相容,Spark-Shell 還提供了一個 SparkContext 的初始化物件,方便使用者操作:
spark-sql的概述以及程式設計模型的介紹
  在程式碼開發的時候建立

val conf = new SparkConf()
val spark: SparkSession = SparkSession.builder()
  .appName("_01spark_sql")
  .config(conf)
  .getOrCreate()

(2)RDD:

這裡主要說的是RDD的侷限性:
  - RDD是不支援spark-sql的
   - RDD 僅表示資料集,RDD 沒有元資料,也就是說沒有欄位語義定義
   - RDD 需要使用者自己優化程式,對程式設計師要求較高
   - 從不同資料來源讀取資料相對困難,讀取到不同格式的資料都必須使用者自己定義轉換方式 合併多個數據源中的資料也較困難

(3)DataFrame:

  DataFrame 被稱為 SchemaRDD。以行為單位構成的分散式資料集合,按照列賦予不同的名稱。對 select、fileter、aggregation 和 sort 等操作符的抽象。其中 Schema 是就是元資料,是語義描述資訊。DataFrame是分散式的Row物件的集合.
  DataFrame = RDD+Schema = SchemaRDD
   優勢
   - DataFrame 是一種特殊型別的 Dataset,DataSet[Row] = DataFrame
   - DataFrame 自帶優化器 Catalyst,可以自動優化程式
   - DataFrame 提供了一整套的 Data Source API
   特點
   - 支援 單機 KB 級到叢集 PB 級的資料處理
   - 支援多種資料格式和儲存系統
   - 通過 Spark SQL Catalyst 優化器可以進行高效的程式碼生成和優化
   - 能夠無縫整合所有的大資料處理工具
   - 提供 Python, Java, Scala, R 語言 API

(4)DataSet:

   由於 DataFrame 的資料型別統一是 Row,所以 DataFrame 也是有缺點的。Row 執行時型別檢查,比如 salary 是字串型別,下面語句也只有執行時才進行型別檢查。 dataframe.filter("salary>1000").show()

   Dataset擴充套件了 DataFrame API,提供了編譯時型別檢查,面向物件風格的 API。
   Dataset 可以和 DataFrame、RDD 相互轉換。DataFrame=Dataset[Row],可見 DataFrame 是一種特殊的 Dataset。

(5)DataSet和DataFrame的區別?

   這裡小編要重點強調一下二者的區別,但是在學習spark-sql的時候就對二者的關係不太清楚,而且在面試的時候也問到了這個問題,真的是一番血淚史啊。
   通過檢視多個前輩對二者的總結我大概的總結一下二者的區別:
   - Dataset可以認為是DataFrame的一個特例,主要區別是Dataset每一個record儲存的是一個強型別值而不是一個Row
   - DataSet可以在編譯時檢查型別,而DataFrame只有在正真執行的時候才會檢查
   - DataFrame每一行的型別都是Row,不解析我們就無法知曉其中有哪些欄位,每個欄位又是什麼型別。我們只能通過getAs[型別]或者row(i)的方式來獲取特定的欄位內容(超級大弊端);而dataSet每一行的型別是不一定的,在自定義了case class之後就可以很自由的獲取每一行的資訊。

好了 廢話說了一堆,不如直接上程式碼:

object SparkSqlTest {
    def main(args: Array[String]): Unit = {
        //遮蔽多餘的日誌
        Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
        Logger.getLogger("org.project-spark").setLevel(Level.WARN)
        val conf: SparkConf = new SparkConf()
        conf.setMaster("local[2]")
            .setAppName("SparkSqlTest")
            //設定spark的序列化器
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
            //將自定義的物件,加入序列化器中
            .registerKryoClasses(Array(classOf[Person]))
        //構建SparkSession物件
        val spark: SparkSession = SparkSession.builder()
            .config(conf).getOrCreate()
        //建立sparkContext物件
        val sc: SparkContext = spark.sparkContext

        val list = List(
            new Person("委xx", 18),
            new Person("吳xx", 20),
            new Person("戚xx", 30),
            new Person("王xx", 40),
            new Person("薛xx", 18)
        )
        //建立DataFrame
        //構建元資料
        val schema = StructType(List(
            StructField("name", DataTypes.StringType),
            StructField("age", DataTypes.IntegerType)
        ))
        //構建RDD
        val listRDD: RDD[Person] = sc.makeRDD(list)
        val RowRDD: RDD[Row] = listRDD.map(field => {
            Row(field.name, field.age)
        })
        val perDF: DataFrame = spark.createDataFrame(RowRDD,schema)

        //建立DataSet
        import spark.implicits._  //這句話一定要加
        val perDS: Dataset[Person] = perDF.as[Person]

        /**
          * 這裡主要介紹DF 和 DS的區別
          */
        perDF.foreach(field=>{
            val name=field.get(0)  //根據元素的index,取出相應的元素的值
            val age=field.getInt(1)  //根據元素的index和元素的型別取出元素的值
            field.getAs[Int]("age")  //根據元素的型別和元素的名稱取出元素的值
            println(s"${age},${name}")
        })
        perDS.foreach(field=>{
            //直接根據上面定義的元素的名稱取值
            val age=field.age
            val name=field.name
            println(s"${age},${name}")
        })
    }
}
case class Person(name: String, age: Int)

個人感覺,就是DataFrame雖然整合和很多優點,但是,如果想從DataFrame中取出具體的某個物件的某個屬性,是不能確定的,步驟比較繁瑣,而且型別不確定。但是使用DataSet則有效額的避免了所有的問題。