1. 程式人生 > >大資料(Spark-S3-SparkSQL架構及原理)

大資料(Spark-S3-SparkSQL架構及原理)

Spark SQL的發展


HDFS -> HIVE 
由於Hadoop在企業生產中的大量使用,HDFS上積累了大量資料,為了給熟悉RDBMS但又不理解MapReduce的技術人員提供快速上手的工具,Hive應運而生。Hive的原理是將SQL語句翻譯成MapReduce計算。

HIVE -> SHARK
MapReduce計算過程中大量的中間磁碟落地過程消耗了大量的I/O,降低了執行效率,為了提供SQL-on-Hadoop的效率,Shark出現了。

Shark是伯克利AMPLab實驗室Spark生態環境的元件之一,它修改了Hive中的記憶體管理、物理計劃和執行三個模組,使得SQL語句直接執行在Spark上,從而使得SQL查詢的速度得到10-100倍的提升。

SHARK -> SPARK SQL

2014年6月1日,Shark專案和SparkSQL專案的主持人Reynold Xin宣佈:停止對Shark的開發,團隊將所有資源放sparkSQL專案上,至此,Shark的發展畫上了句號。

隨著Spark的發展,Shark對於Hive的太多依賴制約了Spark的One Stack rule them all的方針,制約了Spark各個元件的相互整合,同時Shark也無法利用Spark的特性進行深度優化,所以決定放棄Shark,提出了SparkSQL專案。

隨著Shark的結束,兩個新的專案應運而生:SparkSQL和Hive on Spark。其中SparkSQL作為Spark生態的一員繼續發展,而不再受限於Hive,只是相容Hive;而Hive on Spark是一個Hive的發展計劃,該計劃將Spark作為Hive的底層引擎之一,也就是說,Hive將不再受限於一個引擎,可以採用Map-Reduce、Tez、Spark等引擎。

SparkSQL優勢

SparkSQL擺脫了對Hive的依賴性,無論在資料相容、效能優化、元件擴充套件方面都得到了極大的方便。
1、資料相容方面  
不但相容Hive,還可以從RDD、parquet檔案、JSON檔案中獲取資料,未來版本甚至支援獲取RDBMS資料以及cassandra等NOSQL資料;

2、效能優化方面  
除了採取In-Memory Columnar Storage、byte-code generation等優化技術外、將會引進Cost Model對查詢進行動態評估、獲取最佳物理計劃等等;
    
3、元件擴充套件方面  
無論是SQL的語法解析器、分析器還是優化器都可以重新定義,進行擴充套件;

 

Spark SQL的效能優化

記憶體列儲存(In-Memory Columnar Storage)
對於記憶體列儲存來說,將所有原生資料型別的列採用原生陣列來儲存,將Hive支援的複雜資料型別(如array、map等)先序列化後並接成一個位元組陣列來儲存。

這樣,每個列建立一個JVM物件,從而導致可以快速地GC和緊湊的資料儲存。

額外的,還可以用低廉CPU開銷的高效壓縮方法來降低記憶體開銷。

更有趣的是,對於分析查詢中頻繁使用的聚合特定列,效能會得到很大的提高,原因就是這些列的資料放在一起,更容易讀入記憶體進行計算。

位元組碼生成技術(bytecode generation,即CG)

在資料庫查詢中有個昂貴的操作就是查詢語句中的表示式,主要是由JVM的記憶體模型引起的。如SELECT a+b FROM table,這個查詢裡如果採用通用的SQL語法途徑去處理,會先生成一個表達樹,會多次設計虛擬函式的呼叫,這會打斷CPU的正常流水線處理,減緩執行速度。
    
spark -1.1.0在catalyst模組的expressions增加了codegen模組,如果使用動態位元組碼生成技術,Spark SQL在執行物理計劃時,會對匹配的表示式採用特定的程式碼動態編譯,然後執行。

Scala程式碼的優化

Spark SQL在使用Scala語言編寫程式碼時,應儘量避免容易GC的低效程式碼。儘管增加了編寫程式碼的難度,但對於使用者來說,還是使用了統一的介面,讓開發在使用上更加容易。

Spark SQL的執行架構

SparkSQL有兩個分支,sqlContext和hiveContext,sqlContext現在只支援SQL語法解析器;hiveContext現在支援SQL語法解析器和hivesql語法解析器,預設為hiveSQL語法解析器,使用者可以通過配置切換成SQL語法解析器,來執行hiveSQL不支援的語法。

Spark SQL由Core、Catalyst、Hive、Hive-ThriftServer四部分構成:
1.Core: 負責處理資料的輸入和輸出,如獲取資料,查詢結果輸出成DataFrame等
2.Catalyst: 負責處理整個查詢過程,包括解析、繫結、優化等
3.Hive: 負責對Hive資料進行處理
4.Hive-ThriftServer: 主要用於對hive的訪問

SparkSQL有兩個分支,sqlContext和hiveContext,sqlContext現在只支援SQL語法解析器;hiveContext現在支援SQL語法解析器和hivesql語法解析器,預設為hiveSQL語法解析器,使用者可以通過配置切換成SQL語法解析器,來執行hiveSQL不支援的語法。

Spark SQL語句的執行順序
1.對讀入的SQL語句進行解析(Parse),分辨出SQL語句中哪些詞是關鍵詞(如SELECT、FROM、WHERE),哪些是表示式、哪些是Projection、哪些是Data Source等,從而判斷SQL語句是否規範;
2.將SQL語句和資料庫的資料字典(列、表、檢視等等)進行繫結(Bind),如果相關的Projection、Data Source等都是存在的話,就表示這個SQL語句是可以執行的;
3.一般的資料庫會提供幾個執行計劃,這些計劃一般都有執行統計資料,資料庫會在這些計劃中選擇一個最優計劃(Optimize);
4.計劃執行(Execute),按Operation-->Data Source-->Result的次序來進行的,在執行過程有時候甚至不需要讀取物理表就可以返回結果,比如重新執行剛運

 

Spark SQL的執行原理

1,使用SessionCatalog儲存元資料
在解析SQL語句之前,會建立SparkSession,或者如果是2.0之前的版本初始化SQLContext,SparkSession只是封裝了SparkContext和SQLContext的建立而已。會把元資料儲存在SessionCatalog中,涉及到表名,欄位名稱和欄位型別。建立臨時表或者檢視,其實就會往SessionCatalog註冊。

2,解析SQL使用ANTLR生成未繫結的邏輯計劃
當呼叫SparkSession的SQL或者SQLContext的SQL方法,我們以2.0為準,就會使用SparkSqlParser進行解析SQL。使用的ANTLR進行詞法解析和語法解析。它分為2個步驟來生成Unresolved LogicalPlan:
詞法分析:Lexical Analysis, 負責將token分組成符號類。
構建一個分析樹或者語法樹AST。

3,使用分析器Analyzer繫結邏輯計劃
在該階段,Analyzer會使用Analyzer Rules,並結合SessionCatalog,對未繫結的邏輯計劃進行解析,生成已繫結的邏輯計劃。

4,使用優化器Optimizer優化邏輯計劃
優化器也是會定義一套Rules,利用這些Rule對邏輯計劃和Exepression進行迭代處理,從而使得樹的節點進行合併和優化。

5,使用SparkPlanner生成物理計劃
SparkSpanner使用Planning Strategies,對優化後的邏輯計劃進行轉換,生成可以執行的物理計劃SparkPlan.

6,使用QueryExecution執行物理計劃
此時呼叫SparkPlan的execute方法,底層其實已經再觸發JOB了,然後返回RDD。

 

Spark SQL的執行架構

TreeNode
邏輯計劃、表示式等都可以用tree來表示,它只是在記憶體中維護,並不會進行磁碟的持久化,分析器和優化器對樹的修改只是替換已有節點。

TreeNode有2個直接子類,QueryPlan和Expression。QueryPlan下又有LogicalPlan和SparkPlan. Expression是表示式體系,不需要執行引擎計算而是可以直接處理或者計算的節點,包括投影操作,操作符運算等

Rule & RuleExecutor
Rule就是指對邏輯計劃要應用的規則,以到達繫結和優化。他的實現類就是RuleExecutor。優化器和分析器都需要繼承RuleExecutor。

每一個子類中都會定義Batch、Once、FixPoint. 其中每一個Batch代表著一套規則,Once表示對樹進行一次操作,FixPoint表示對樹進行多次的迭代操作。

RuleExecutor內部提供一個Seq[Batch]屬性,裡面定義的是RuleExecutor的處理邏輯,具體的處理邏輯由具體的Rule子類實現。
 

Catalyst優化器

SparkSQL1.1總體上由四個模組組成:core、catalyst、hive、hive-Thriftserver:
    core處理資料的輸入輸出,從不同的資料來源獲取資料(RDD、Parquet、json等),將查詢結果輸出成schemaRDD;
    catalyst處理查詢語句的整個處理過程,包括解析、繫結、優化、物理計劃等,說其是優化器,還不如說是查詢引擎;
    hive對hive資料的處理
    hive-ThriftServer提供CLI和JDBC/ODBC介面
在這四個模組中,catalyst處於最核心的部分,其效能優劣將影響整體的效能。由於發展時間尚短,還有很多不足的地方,但其外掛式的設計,為未來的發展留下了很大的空間。

從上圖看,catalyst主要的實現元件有:
    1.sqlParse,完成sql語句的語法解析功能,目前只提供了一個簡單的sql解析器;
    2.Analyzer,主要完成繫結工作,將不同來源的Unresolved LogicalPlan和資料元資料(如hive metastore、Schema catalog)進行繫結,生成resolved LogicalPlan;
    3.optimizer對resolved LogicalPlan進行優化,生成optimized LogicalPlan;
    4.Planner將LogicalPlan轉換成PhysicalPlan;、
    5.CostModel,主要根據過去的效能統計資料,選擇最佳的物理執行計劃

這些元件的基本實現方法:
    1.先將sql語句通過解析生成Tree,然後在不同階段使用不同的Rule應用到Tree上,通過轉換完成各個元件的功能。
    2.Analyzer使用Analysis Rules,配合資料元資料(如hive metastore、Schema catalog),完善Unresolved LogicalPlan的屬性而轉換成resolved LogicalPlan;
    3.optimizer使用Optimization Rules,對resolved LogicalPlan進行合併、列裁剪、過濾器下推等優化作業而轉換成optimized LogicalPlan;
    4.Planner使用Planning Strategies,對optimized LogicalPlan

 

執行優化

為了說明查詢優化,我們來看下圖展示的人口資料分析的示例。圖中構造了兩個DataFrame,將它們join之後又做了一次filter操作。如果原封不動地執行這個執行計劃,最終的執行效率是不高的。因為join是一個代價較大的操作,也可能會產生一個較大的資料集。如果我們能將filter下推到 join下方,先對DataFrame進行過濾,再join過濾後的較小的結果集,便可以有效縮短執行時間。而Spark SQL的查詢優化器正是這樣做的。簡而言之,邏輯查詢計劃優化就是一個利用基於關係代數的等價變換,將高成本的操作替換為低成本操作的過程。

得到的優化執行計劃在轉換成物 理執行計劃的過程中,還可以根據具體的資料來源的特性將過濾條件下推至資料來源內。最右側的物理執行計劃中Filter之所以消失不見,就是因為溶入了用於執行最終的讀取操作的表掃描節點內。

 

DataFrame初探

在Spark中,DataFrame是一種以RDD為基礎的分散式資料集,類似於傳統資料庫中的二維表格。DataFrame與RDD的主要區別在於,前者帶有schema元資訊,即DataFrame所表示的二維表資料集的每一列都帶有名稱和型別。這使得Spark SQL得以洞察更多的結構資訊,從而對藏於DataFrame背後的資料來源以及作用於DataFrame之上的變換進行了針對性的優化,最終達到大幅提升執行時效率的目標。反觀RDD,由於無從得知所存資料元素的具體內部結構,Spark Core只能在stage層面進行簡單、通用的流水線優化。

DataFrame的特性

能夠將單個節點叢集上的大小為Kilobytes到Petabytes的資料處理為大型叢集。
支援不同的資料格式(Avro,csv,彈性搜尋和Cassandra)和儲存系統(HDFS,HIVE表,mysql等)。
通過Spark SQL Catalyst優化器(樹變換框架)的最先進的優化和程式碼生成。
可以通過Spark-Core輕鬆地與所有大資料工具和框架整合。
提供用於Python,Java,Scala和R程式設計的API。

建立DataFrame
在Spark SQL中,開發者可以非常便捷地將各種內、外部的單機、分散式資料轉換為DataFrame。

# 從Hive中的users表構造
DataFrame users = sqlContext.table("users") 
# 載入S3上的JSON檔案 
logs = sqlContext.load("s3n://path/to/data.json", "json") 
# 載入HDFS上的Parquet檔案 
clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet") 
# 通過JDBC訪問MySQL comments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user") 
# 將普通RDD轉變為

DataFrame rdd = sparkContext.textFile("article.txt") \ 
                   .flatMap(lambda line: line.split()) \ 
                   .map(lambda word: (word, 1)) \ 
                   .reduceByKey(lambda a, b: a + b) \ 
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"]) 

# 將本地資料容器轉變為

DataFrame data = [("Alice", 21), ("Bob", 24)] 
people = sqlContext.createDataFrame(data, ["name", "age"]) 

使用DataFrame

和R、Pandas類似,Spark DataFrame也提供了一整套用於操縱資料的DSL。這些DSL在語義上與SQL關係查詢非常相近(這也是Spark SQL能夠為DataFrame提供無縫支援的重要原因之一) 。

# 建立一個只包含"年輕"使用者的DataFrame
df = users.filter(users.age < 21)

# 也可以使用Pandas風格的語法
df = users[users.age < 21]

# 將所有人的年齡加1
df.select(young.name, young.age + 1)

# 統計年輕使用者中各性別人數
df.groupBy("gender").count()

# 將所有年輕使用者與另一個名為logs的DataFrame聯接起來
df.join(logs, logs.userId == users.userId, "left_outer")

儲存DataFrame
當資料分析邏輯編寫完畢後,我們便可以將最終結果儲存下來或展現出來。

# 追加至HDFS上的Parquet檔案

df.save(path="hdfs://path/to/data.parquet",
           source="parquet",
           mode="append")

# 覆寫S3上的JSON檔案

df.save(path="s3n://path/to/data.json",
           source="json",
           mode="append")

# 儲存為SQL表
df.saveAsTable(tableName="young", source="parquet" mode="overwrite")

# 轉換為Pandas DataFrame(Python API特有功能)
pandasDF = young.toPandas()

# 以表格形式列印輸出
df.show()