1. 程式人生 > 其它 >SparkSQL資料抽象與執行過程分享

SparkSQL資料抽象與執行過程分享

SparkSQL資料抽象

引入DataFrame

就易用性而言,對比傳統的MapReduce API,Spark的RDD API有了數量級的飛躍並不為過。然而,對於沒有MapReduce和函數語言程式設計經驗的新手來說,RDD API仍然存在著一定的門檻。

另一方面,資料科學家們所熟悉的R、Pandas等傳統資料框架雖然提供了直觀的API,卻侷限於單機處理,無法​大資料培訓​勝任大資料場景。

為了解決這一矛盾,Spark SQL 1.3.0在原有SchemaRDD的基礎上提供了與R和Pandas風格類似的DataFrame API。

新的DataFrame AP不僅可以大幅度降低普通開發者的學習門檻,同時還支援Scala、Java與Python三種語言。更重要的是,由於脫胎自SchemaRDD,DataFrame天然適用於分散式大資料場景。

注意:

DataFrame它不是Spark SQL提出來的,而是早期在R、Pandas語言就已經有了的。

DataFrame是什麼

在Spark中,DataFrame是一種以RDD為基礎的分散式資料集,類似於傳統資料庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元資訊,即DataFrame所表示的二維表資料集的每一列都帶有名稱和型別。

使得Spark SQL得以洞察更多的結構資訊,從而對藏於DataFrame背後的資料來源以及作用於DataFrame之上的變換進行鍼對性的優化,最終達到大幅提升執行時效率。反觀RDD,由於無從得知所存資料元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。

上圖中左側的RDD[Person]雖然以Person為型別引數,但Spark框架本身不瞭解Person類的內部結構。而中間的DataFrame卻提供了詳細的結構資訊,使得Spark SQL可以清楚地知道該資料集中包含哪些列,每列的名稱和型別各是什麼。瞭解了這些資訊之後,Spark SQL的查詢優化器就可以進行鍼對性的優化。後者由於在編譯期有詳盡的型別資訊,編譯期就可以編譯出更加有針對性、更加優化的可執行程式碼。官方定義:

  • Dataset:A DataSet is a distributed collection of data. (分散式的資料集)
  • DataFrame:A DataFrame is a DataSet organized into named columns.(以列(列名,列型別,列值)的形式構成的分散式的資料集,按照列賦予不同的名稱)

DataFrame有如下特性:

1)分散式的資料集,並且以列的方式組合的,相當於具有schema的RDD;

2)相當於關係型資料庫中的表,但是底層有優化;

3)提供了一些抽象的操作,如select、filter、aggregation、plot;

4)它是由於R語言或者Pandas語言處理小資料集的經驗應用到處理分散式大資料集上;

5)在1.3版本之前,叫SchemaRDD;

Schema 資訊

檢視DataFrame中Schema是什麼,執行如下命令:

df.schema

Schema資訊封裝在StructType中,包含很多StructField物件,原始碼。

StructType 定義,是一個樣例類,屬性為StructField的陣列

StructField 定義,同樣是一個樣例類,有四個屬性,其中欄位名稱和型別為必填

自定義Schema結構,官方提供的示例程式碼:

Row

DataFrame中每條資料封裝在Row中,Row表示每行資料。

如何構建Row物件:要麼是傳遞value,要麼傳遞Seq

方式一:下標獲取,從0開始,類似陣列下標獲取如何獲取Row中每個欄位的值呢?

方式二:指定下標,知道型別

方式三:通過As轉換型別

Dataset

引入

Spark在Spark 1.3版本中引入了Dataframe,DataFrame是組織到命名列中的分散式資料集合,但是有如下幾點限制:

編譯時型別不安全:Dataframe API不支援編譯時安全性,這限制了在結構不知道時操縱資料。以下示例在編譯期間有效。但是,執行此程式碼時將出現執行時異常。

無法對域物件(丟失域物件)進行操作:將域物件轉換為DataFrame後,無法從中重新生成它;下面的示例中,一旦我們從personRDD建立personDF,將不會恢復Person類的原始RDD(RDD [Person])。

基於上述的兩點,從Spark 1.6開始出現Dataset,至Spark 2.0中將DataFrame與Dataset合併,其中DataFrame為Dataset特殊型別,型別為Row。

針對RDD、DataFrame與Dataset三者程式設計比較來說,Dataset API無論語法錯誤和分析錯誤在編譯時都能發現,然而RDD和DataFrame有的需要在執行時才能發現。

此外RDD與Dataset相比較而言,由於Dataset資料使用特殊編碼,所以在儲存資料時更加節省記憶體。

總結:

Dataset是在Spark1.6中新增的新的介面,是DataFrame API的一個擴充套件,是Spark最新的資料抽象,結合了RDD和DataFrame的優點。

與RDD相比:儲存了更多的描述資訊,概念上等同於關係型資料庫中的二維表;

與DataFrame相比:儲存了型別資訊,是強型別的,提供了編譯時型別檢查,呼叫Dataset的方法先會生成邏輯計劃,然後被Spark的優化器進行優化,最終生成物理計劃,然後提交到叢集中執行;

Dataset 是什麼

Dataset是一個強型別的特定領域的物件,這種物件可以函式式或者關係操作並行地轉換。

從Spark 2.0開始,DataFrame與Dataset合併,每個Dataset也有一個被稱為一個DataFrame的型別化檢視,這種DataFrame是Row型別的Dataset,即Dataset[Row]。

Dataset API是DataFrames的擴充套件,它提供了一種型別安全的,面向物件的程式設計介面。它是一個強型別,不可變的物件集合,對映到關係模式。在資料集的核心 API是一個稱為編碼器的新概念,它負責在JVM物件和表格表示之間進行轉換。表格表示使用Spark內部Tungsten二進位制格式儲存,允許對序列化資料進行操作並提高記憶體利用率。Spark 1.6支援自動生成各種型別的編碼器,包括基本型別(例如String,Integer,Long),Scala案例類和Java Bean。

針對Dataset資料結構來說,可以簡單的從如下四個要點記憶與理解:

Spark 框架從最初的資料結構RDD、到SparkSQL中針對結構化資料封裝的資料結構DataFrame,最終使用Dataset資料集進行封裝,發展流程如下。

所以在實際專案中建議使用Dataset進行資料封裝,資料分析效能和資料儲存更加好。

SparkSQL底層如何執行

RDD 的執行流程

大致執行步驟:

  • 先將 RDD 解析為由 Stage 組成的 DAG, 後將 Stage 轉為 Task 直接執行

問題:

  • 任務會按照程式碼所示執行, 依賴開發者的優化, 開發者的會在很大程度上影響執行效率

解決辦法:

  • 建立一個元件, 幫助開發者修改和優化程式碼, 但這在 RDD 上是無法實現的

為什麼 RDD 無法自我優化?

  • RDD 沒有 Schema 資訊
  • RDD 可以同時處理結構化和非結構化的資料

SparkSQL 提供了什麼?

和 RDD 不同, SparkSQL 的 Dataset 和 SQL 並不是直接生成計劃交給叢集執行, 而是經過了一個叫做 Catalyst 的優化器, 這個優化器能夠自動幫助開發者優化程式碼。也就是說, 在 SparkSQL 中, 開發者的程式碼即使不夠優化, 也會被優化為相對較好的形式去執行。

為什麼 SparkSQL 提供了這種能力?

首先, SparkSQL 大部分情況用於處理結構化資料和半結構化資料, 所以 SparkSQL 可以獲知資料的 Schema, 從而根據其 Schema 來進行優化。

Catalyst

為了解決過多依賴 Hive 的問題, SparkSQL 使用了一個新的 SQL 優化器替代 Hive 中的優化器, 這個優化器就是 Catalyst, 整個 SparkSQL 的架構大致如下:

1.API 層簡單的說就是 Spark 會通過一些 API 接受 SQL 語句 2.收到 SQL 語句以後, 將其交給 Catalyst, Catalyst 負責解析 SQL, 生成執行計劃等 3.Catalyst 的輸出應該是 RDD 的執行計劃 4.最終交由叢集執行

具體流程:

Step 1 : 解析 SQL, 並且生成 AST (抽象語法樹)

Step 2 : 在 AST 中加入元資料資訊, 做這一步主要是為了一些優化, 例如 col = col 這樣的條件, 下圖是一個簡略圖, 便於理解

  • score.id → id#1#L 為 score.id 生成 id 為 1, 型別是 Long
  • score.math_score → math_score#2#L 為 score.math_score 生成 id 為 2, 型別為 Long
  • people.id → id#3#L 為 people.id 生成 id 為 3, 型別為 Long
  • people.age → age#4#L 為 people.age 生成 id 為 4, 型別為 Long

Step 3 : 對已經加入元資料的 AST, 輸入優化器, 進行優化, 從兩種常見的優化開始, 簡單介紹:

謂詞下推 Predicate Pushdown, 將 Filter 這種可以減小資料集的操作下推, 放在 Scan 的位置, 這樣可以減少操作時候的資料量。

  • 列值裁剪 Column Pruning, 在謂詞下推後, people 表之上的操作只用到了 id 列, 所以可以把其它列裁剪掉, 這樣可以減少處理的資料量, 從而優化處理速度
  • 還有其餘很多優化點, 大概一共有一二百種, 隨著 SparkSQL 的發展, 還會越來越多, 感興趣的同學可以繼續通過原始碼瞭解, 原始碼在 org.apache.spark.sql.catalyst.optimizer.Optimizer

Step 4 : 上面的過程生成的 AST 其實最終還沒辦法直接執行, 這個 AST 叫做 邏輯計劃, 結束後, 需要生成 物理計劃, 從而生成 RDD 來執行。

在生成物理計劃的時候, 會經過成本模型對整棵樹再次執行優化, 選擇一個更好的計劃。

在生成物理計劃以後, 因為考慮到效能, 所以會使用程式碼生成, 在機器中執行。

可以使用 queryExecution 方法檢視邏輯執行計劃, 使用 explain 方法檢視物理執行計劃。

也可以使用 Spark WebUI 進行檢視:

SparkSQL 和 RDD 不同的主要點是在於其所操作的資料是結構化的, 提供了對資料更強的感知和分析能力, 能夠對程式碼進行更深層的優化, 而這種能力是由一個叫做 Catalyst 的優化器所提供的。

Catalyst 的主要運作原理是分為三步, 先對 SQL 或者 Dataset 的程式碼解析, 生成邏輯計劃, 後對邏輯計劃進行優化, 再生成物理計劃, 最後生成程式碼到叢集中以 RDD 的形式執行。