Spark SQL / Catalyst 內部原理 與 RBO
原創文章,轉載請務必將下面這段話置於文章開頭處。
本文轉發自技術世界,原文鏈接 http://www.jasongj.com/spark/rbo/
本文所述內容均基於 2018年9月10日 Spark 最新 Release 2.3.1 版本。後續將持續更新
Spark SQL 架構
Spark SQL 的整體架構如下圖所示
從上圖可見,無論是直接使用 SQL 語句還是使用 DataFrame,都會經過如下步驟轉換成 DAG 對 RDD 的操作
- Parser 解析 SQL,生成 Unresolved Logical Plan
- 由 Analyzer 結合 Catalog 信息生成 Resolved Logical Plan
- Optimizer根據預先定義好的規則對 Resolved Logical Plan 進行優化並生成 Optimized Logical Plan
- Query Planner 將 Optimized Logical Plan 轉換成多個 Physical Plan
- CBO 根據 Cost Model 算出每個 Physical Plan 的代價並選取代價最小的 Physical Plan 作為最終的 Physical Plan
- Spark 以 DAG 的方法執行上述 Physical Plan
- 在執行 DAG 的過程中,Adaptive Execution 根據運行時信息動態調整執行計劃從而提高執行效率
Parser
Spark SQL 使用 Antlr 進行記法和語法解析,並生成 UnresolvedPlan。
當用戶使用 SparkSession.sql(sqlText : String) 提交 SQL 時,SparkSession 最終會調用 SparkSqlParser 的 parsePlan 方法。該方法分兩步
- 使用 Antlr 生成的 SqlBaseLexer 對 SQL 進行詞法分析,生成 CommonTokenStream
- 使用 Antlr 生成的 SqlBaseParser 進行語法分析,得到 LogicalPlan
現在兩張表,分別定義如下
CREATE TABLE score ( id INT, math_score INT, english_score INT )
CREATE TABLE people (
id INT,
age INT,
name INT
)
對其進行關聯查詢如下
SELECT sum(v)
FROM (
SELECT score.id,
100 + 80 + score.math_score + score.english_score AS v
FROM people
JOIN score
ON people.id = score.id
AND people.age > 10
) tmp
生成的 UnresolvedPlan 如下圖所示。
從上圖可見
- 查詢涉及的兩張表,被解析成了兩個 UnresolvedRelation,也即只知道這們是兩張表,卻並不知道它們是 EXTERNAL TABLE 還是 MANAGED TABLE,也不知道它們的數據存在哪兒,更不知道它們的表結構如何
- sum(v) 的結果未命名
- Project 部分只知道是選擇出了屬性,卻並不知道這些屬性屬於哪張表,更不知道其數據類型
- Filter 部分也不知道數據類型
Spark SQL 解析出的 UnresolvedPlan 如下所示
== Parsed Logical Plan ==
‘Project [unresolvedalias(‘sum(‘v), None)]
+- ‘SubqueryAlias tmp
+- ‘Project [‘score.id, (((100 + 80) + ‘score.math_score) + ‘score.english_score) AS v#493]
+- ‘Filter ((‘people.id = ‘score.id) && (‘people.age > 10))
+- ‘Join Inner
:- ‘UnresolvedRelation `people`
+- ‘UnresolvedRelation `score`
Analyzer
從 Analyzer 的構造方法可見
- Analyzer 持有一個 SessionCatalog 對象的引用
- Analyzer 繼承自 RuleExecutor[LogicalPlan],因此可對 LogicalPlan 進行轉換
class Analyzer(
catalog: SessionCatalog,
conf: SQLConf,
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
Analyzer 包含了如下的轉換規則
lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
new SubstituteUnresolvedOrdinals(conf)),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions ::
ResolveRelations ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
ResolveNewInstance ::
ResolveUpCast ::
ResolveGroupingAnalytics ::
ResolvePivot ::
ResolveOrdinalInOrderByAndGroupBy ::
ResolveAggAliasInGroupBy ::
ResolveMissingReferences ::
ExtractGenerator ::
ResolveGenerate ::
ResolveFunctions ::
ResolveAliases ::
ResolveSubquery ::
ResolveSubqueryColumnAliases ::
ResolveWindowOrder ::
ResolveWindowFrame ::
ResolveNaturalAndUsingJoin ::
ExtractWindowExpressions ::
GlobalAggregates ::
ResolveAggregateFunctions ::
TimeWindowing ::
ResolveInlineTables(conf) ::
ResolveTimeZone(conf) ::
ResolvedUuidExpressions ::
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
Batch("View", Once,
AliasViewChild(conf)),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
HandleNullInputsForUDF),
Batch("FixNullability", Once,
FixNullability),
Batch("Subquery", Once,
UpdateOuterReferences),
Batch("Cleanup", fixedPoint,
CleanupAliases)
)
例如, ResolveRelations 用於分析查詢用到的 Table 或 View。本例中 UnresolvedRelation (people) 與 UnresolvedRelation (score) 被解析為 HiveTableRelation (json.people) 與 HiveTableRelation (json.score),並列出其各自包含的字段名。
經 Analyzer 分析後得到的 Resolved Logical Plan 如下所示
== Analyzed Logical Plan ==
sum(v): bigint
Aggregate [sum(cast(v#493 as bigint)) AS sum(v)#504L]
+- SubqueryAlias tmp
+- Project [id#500, (((100 + 80) + math_score#501) + english_score#502) AS v#493]
+- Filter ((id#496 = id#500) && (age#497 > 10))
+- Join Inner
:- SubqueryAlias people
: +- HiveTableRelation `jason`.`people`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#496, age#497, name#498]
+- SubqueryAlias score
+- HiveTableRelation `jason`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#500, math_score#501, english_score#502]
Analyzer 分析前後的 LogicalPlan 對比如下
由上圖可見,分析後,每張表對應的字段集,字段類型,數據存儲位置都已確定。Project 與 Filter 操作的字段類型以及在表中的位置也已確定。
有了這些信息,已經可以直接將該 LogicalPlan 轉換為 Physical Plan 進行執行。
但是由於不同用戶提交的 SQL 質量不同,直接執行會造成不同用戶提交的語義相同的不同 SQL 執行效率差距甚遠。換句話說,如果要保證較高的執行效率,用戶需要做大量的 SQL 優化,使用體驗大大降低。
為了盡可能保證無論用戶是否熟悉 SQL 優化,提交的 SQL 質量如何, Spark SQL 都能以較高效率執行,還需在執行前進行 LogicalPlan 優化。
Optimizer
Spark SQL 目前的優化主要是基於規則的優化,即 RBO (Rule-based optimization)
- 每個優化以 Rule 的形式存在,每條 Rule 都是對 Analyzed Plan 的等價轉換
- RBO 設計良好,易於擴展,新的規則可以非常方便地嵌入進 Optimizer
- RBO 目前已經足夠好,但仍然需要更多規則來 cover 更多的場景
- 優化思路主要是減少參與計算的數據量以及計算本身的代價
PushdownPredicate
PushdownPredicate 是最常見的用於減少參與計算的數據量的方法。
前文中直接對兩表進行 Join 操作,然後再 進行 Filter 操作。引入 PushdownPredicate 後,可先對兩表進行 Filter 再進行 Join,如下圖所示。
當 Filter 可過濾掉大部分數據時,參與 Join 的數據量大大減少,從而使得 Join 操作速度大大提高。
這裏需要說明的是,此處的優化是 LogicalPlan 的優化,從邏輯上保證了將 Filter 下推後由於參與 Join 的數據量變少而提高了性能。另一方面,在物理層面,Filter 下推後,對於支持 Filter 下推的 Storage,並不需要將表的全量數據掃描出來再過濾,而是直接只掃描符合 Filter 條件的數據,從而在物理層面極大減少了掃描表的開銷,提高了執行速度。
ConstantFolding
本文的 SQL 查詢中,Project 部分包含了 100 + 800 + match_score + english_score 。如果不進行優化,那如果有一億條記錄,就會計算一億次 100 + 80,非常浪費資源。因此可通過 ConstantFolding 將這些常量合並,從而減少不必要的計算,提高執行速度。
ColumnPruning
在上圖中,Filter 與 Join 操作會保留兩邊所有字段,然後在 Project 操作中篩選出需要的特定列。如果能將 Project 下推,在掃描表時就只篩選出滿足後續操作的最小字段集,則能大大減少 Filter 與 Project 操作的中間結果集數據量,從而極大提高執行速度。
這裏需要說明的是,此處的優化是邏輯上的優化。在物理上,Project 下推後,對於列式存儲,如 Parquet 和 ORC,可在掃描表時就只掃描需要的列而跳過不需要的列,進一步減少了掃描開銷,提高了執行速度。
經過如上優化後的 LogicalPlan 如下
== Optimized Logical Plan ==
Aggregate [sum(cast(v#493 as bigint)) AS sum(v)#504L]
+- Project [((180 + math_score#501) + english_score#502) AS v#493]
+- Join Inner, (id#496 = id#500)
:- Project [id#496]
: +- Filter ((isnotnull(age#497) && (age#497 > 10)) && isnotnull(id#496))
: +- HiveTableRelation `jason`.`people`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#496, age#497, name#498]
+- Filter isnotnull(id#500)
+- HiveTableRelation `jason`.`score`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [id#500, math_score#501, english_score#502]
SparkPlanner
得到優化後的 LogicalPlan 後,SparkPlanner 將其轉化為 SparkPlan 即物理計劃。
本例中由於 score 表數據量較小,Spark 使用了 BroadcastJoin。因此 score 表經過 Filter 後直接使用 BroadcastExchangeExec 將數據廣播出去,然後結合廣播數據對 people 表使用 BroadcastHashJoinExec 進行 Join。再經過 Project 後使用 HashAggregateExec 進行分組聚合。
至此,一條 SQL 從提交到解析、分析、優化以及執行的完整過程就介紹完畢。
本文介紹的 Optimizer 屬於 RBO,實現簡單有效。它屬於 LogicalPlan 的優化,所有優化均基於 LogicalPlan 本身的特點,未考慮數據本身的特點,也未考慮算子本身的代價。下文將介紹 CBO,它充分考慮了數據本身的特點(如大小、分布)以及操作算子的特點(中間結果集的分布及大小)及代價,從而更好的選擇執行代價最小的物理執行計劃,即 SparkPlan。
Spark SQL / Catalyst 內部原理 與 RBO