Spark SQL原始碼解析(三)Analysis階段分析
阿新 • • 發佈:2020-04-28
Spark SQL原理解析前言:
[Spark SQL原始碼剖析(一)SQL解析框架Catalyst流程概述](https://www.cnblogs.com/listenfwind/p/12724381.html)
[Spark SQL原始碼解析(二)Antlr4解析Sql並生成樹](https://www.cnblogs.com/listenfwind/p/12735833.html)
# Analysis階段概述
首先,這裡需要引入一個新概念,前面介紹SQL parse階段,會使用antlr4,將一條SQL語句解析成語法樹,然後使用antlr4的訪問者模式遍歷生成語法樹,也就是Logical Plan。但其實,SQL parse這一階段生成的Logical Plan是被稱為Unresolved Logical Plan。所謂unresolved,就是說SQL語句中的物件都是未解釋的。
比如說一條語句**SELECT col FROM sales**,當我們不知道col的具體型別(Int,String,還是其他),甚至是否在sales表中有col這一個列的時候,就稱之為是Unresolved的。
而在analysis階段,主要就是解決這個問題,也就是將Unresolved的變成Resolved的。Spark SQL通過使用Catalyst rule和Catalog來跟蹤資料來源的table資訊。並對Unresolved應用如下的rules(rule可以理解為一條一條的規則,當匹配到樹某些節點的時候就會被應用)。
- 從Catalog中,查詢Unresolved Logical Plan中對應的關係(relations)
- 根據輸入屬性名(比如上述的col列),對映到具體的屬性
- 確定哪些屬性引用相同的值並賦予它們唯一的ID(這個是論文中的內容,看不是很明白,不過主要是方便後面優化器實現的)
- Propagating and coercing types through expressions,這個看著也是有點迷,大概是對資料進行強制轉換,方便後續對1 + col 這樣的資料進行處理。
而處理過後,就會真正生成一棵Resolved Logical Plan,接下來就去看看原始碼裡面是怎麼實現的吧。
# Analysis階段詳細解析
通過跟蹤呼叫程式碼,在呼叫完SQL parse的內容後,就會跑去org.apache.spark.sql.execution.QueryExecution這個類中執行,後面包括Logical Optimization階段,Physical Planning階段,生成RDD任務階段都是在這個類中進行排程的。不過此次只介紹Analysis。
在QueryExecution中,會去呼叫org.apache.spark.sql.catalyst.Analyzer這個類,這個類是繼承自org.apache.spark.sql.catalyst.rules.RuleExecutor,記住這個,後面還有很多個階段都是通過繼承這個類實現的,實現原理也和Analysis階段相似。
繼承自RuleExecutor的類,包括這裡的Analyzer類,都是在自身實現大量的rule,然後註冊到batch變數中,這裡大概貼點程式碼瞅瞅。
```
class Analyzer(
catalog: SessionCatalog,
conf: SQLConf,
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {
......其他程式碼
lazy val batches: Seq[Batch] = Seq(
Batch("Hints", fixedPoint,
new ResolveHints.ResolveBroadcastHints(conf),
ResolveHints.ResolveCoalesceHints,
ResolveHints.RemoveAllHints),
Batch("Simple Sanity Check", Once,
LookupFunctions),
Batch("Substitution", fixedPoint,
CTESubstitution,
WindowsSubstitution,
EliminateUnions,
new SubstituteUnresolvedOrdinals(conf)),
......其他程式碼
```
先大概說下batches這個變數吧,batches是由Batch的列表構成。而Batch的具體簽名如下:
```
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
......其他程式碼
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
......其他程式碼
}
```
一個Batch由策略Strategy,和一組Rule構成,其中策略Strategy主要是區分迭代次數用的,按我的理解,某些rule可以迭代多次,越多次效果會越好,類似機器學習的學習過程。而策略Strategy會規定迭代一次還是固定次數。而rule就是具體的應用規則了,這個先略過。
在Analyzer這個類中,你會發現很大篇幅的程式碼都是各種各樣rule的實現。然後最終,Analyzer會去呼叫super.execute()方法,也就是呼叫父類(RuleExecutor)的方法執行具體邏輯。而父類又會去呼叫這個batches變數,迴圈來與Sql Parse階段生成的Unresolved Logical Plan做匹配,匹配到了就執行具體的驗證。還是貼下程式碼看看吧。
```
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
def execute(plan: TreeType): TreeType = {
var curPlan = plan
val queryExecutionMetrics = RuleExecutor.queryExecutionMeter
//遍歷Analyzer中定義的batchs變數
batches.foreach { batch =>
val batchStartPlan = curPlan
var iteration = 1
var lastPlan = curPlan
var continue = true
// Run until fix point (or the max number of iterations as specified in the strategy.
//這裡的continue決定是否再次迴圈,由batch的策略(固定次數或單次),以及該batch對plan的作用效果這兩者控制
while (continue) {
//呼叫foldLeft讓batch中每條rule應用於plan,然後就是執行對應rule規則邏輯了
curPlan = batch.rules.foldLeft(curPlan) {
case (plan, rule) =>
val startTime = System.nanoTime()
val result = rule(plan)
val runTime = System.nanoTime() - startTime
if (!result.fastEquals(plan)) {
queryExecutionMetrics.incNumEffectiveExecution(rule.ruleName)
queryExecutionMetrics.incTimeEffectiveExecutionBy(rule.ruleName, runTime)
logTrace(
s"""
|=== Applying Rule ${rule.ruleName} ===
|${sideBySide(plan.treeString, result.treeString).mkString("\n")}
""".stripMargin)
}
queryExecutionMetrics.incExecutionTimeBy(rule.ruleName, runTime)
queryExecutionMetrics.incNumExecution(rule.ruleName)
// Run the structural integrity checker against the plan after each rule.
if (!isPlanIntegral(result)) {
val message = s"After applying rule ${rule.ruleName} in batch ${batch.name}, " +
"the structural integrity of the plan is broken."
throw new TreeNodeException(result, message, null)
}
result
}
iteration += 1
//策略的生效地方
if (iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
if (Utils.isTesting) {
throw new TreeNodeException(curPlan, message, null)
} else {
logWarning(message)
}
}
continue = false
}
if (curPlan.fastEquals(lastPlan)) {
logTrace(
s"Fixed point reached for batch ${batch.name} after ${iteration - 1} iterations.")
continue = false
}
lastPlan = curPlan
}
if (!batchStartPlan.fastEquals(curPlan)) {
logDebug(
s"""
|=== Result of Batch ${batch.name} ===
|${sideBySide(batchStartPlan.treeString, curPlan.treeString).mkString("\n")}
""".stripMargin)
} else {
logTrace(s"Batch ${batch.name} has no effect.")
}
}
curPlan
}
}
```
其實這個類的邏輯不難懂,就是遍歷batchs變數,而每個batch又會去使用scala的foldLeft函式,遍歷應用裡面的每條rule。然後根據Batch的策略以及將新生成的Plan與舊的Plan比較,決定是否要再次遍歷。然後最後將新生成的Plan輸出。
如果不清楚scala的foldLeft函式內容,可以百度下看看,不難懂的。然後跟RuleExecutor有關的基本都是這個套路,區別只在於rule的不同。
接下來我們來看看具體是如果應用一條rule,將Unresolved LogicalPlan轉換成Resolved LogicalPlan吧。
# Rule介紹
前面說到,在Analyzer中重寫了Batchs變數,Batchs包含多個Batch,每個Batch又有多個Rule,所以不可能全部看過來,慶幸的是,要了解Unresolved LogicalPlan轉換成Resolved LogicalPlan,只需要看一個Rule就行,那就是ResolveRelations這個Rule,我們就只介紹這個Rule來管中窺豹。
各自Rule基本都是object型別,也就是靜態的,且繼承自Rule這個抽象類,Rule很簡單,就一個ruleName變數喝一個apply方法用以實現邏輯,然後就沒了。所以重點還是在繼承後的實現邏輯。
前面提到,從Unresolved到Resolved的過程,可以理解為就是將SQL語句中的型別和欄位,對映到實體表中的欄位資訊。而儲存實體表元資料資訊的,是Catalog,到具體的類,是org.apache.spark.sql.catalyst.catalog.SessionCatalog。
我們來看看具體的邏輯程式碼:
```
object ResolveRelations extends Rule[LogicalPlan] {
......其他程式碼
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved =>
EliminateSubqueryAliases(lookupTableFromCatalog(u)) match {
case v: View =>
u.failAnalysis(s"Inserting into a view is not allowed. View: ${v.desc.identifier}.")
case other => i.copy(table = other)
}
case u: UnresolvedRelation => resolveRelation(u)
}
......其他程式碼
}
```
邏輯其實也蠻簡單的,就是匹配UnresolvedRelation(就是Unresolved的節點),然後遞迴去Catlog中獲取對應的元資料資訊,遞迴將它及子節點變成Resoulved。不過還有個要提的是,SQL中對應的,有可能是檔案資料,或是資料庫中的表,抑或是檢視(view),針對檔案資料是不會轉換的,轉換成Resolved會在後面進行。而表和檢視則會立即轉換。
最後,接上一篇的例子,接著來看看,經過Analysis階段後,LogicalPlan變成什麼樣吧,上一篇SQL parse使用的示例程式碼:
```
//生成DataFrame
val df = Seq((1, 1)).toDF("key", "value")
df.createOrReplaceTempView("src")
//呼叫spark.sql
val queryCaseWhen = sql("select key from src ")
```
經過上次介紹的SQL parse後是變成這樣:
```
'Project ['key]
+- 'UnresolvedRelation `src`
```
這裡的涵義上篇已介紹,不再贅述,而經過本次的Analysis後,則會變成這樣
```
Project [key#5]
+- SubqueryAlias `src`
+- Project [_1#2 AS key#5, _2#3 AS value#6]
+- LocalRelation [_1#2, _2#3]
```
可以發現,主要就是對UnresolvedRelation進行展開,現在我們可以發現src有兩個欄位,分別是key和value及其對應的別名(1#2,2#3)。這裡還有一個SubqueryAlias,這個我也不是很明白,按原始碼裡面的說法,這裡的subquery僅用來提供屬性的作用域資訊,Analysis階段過後就就可以將其刪除,所以在Optimization階段後會發現SubqueryAlias消失了。
### 小結
OK,那今天就先介紹到這裡吧,主要綜述了Analysis的內容,然後介紹RuleExecution的邏輯,最後簡單看了個Rule的具體內容以及承接SQL parse階段的例子。有興趣的童鞋可以自己去順著思路翻原始碼看看。