1. 程式人生 > >Spark SQL原始碼解析(五)SparkPlan準備和執行階段

Spark SQL原始碼解析(五)SparkPlan準備和執行階段

Spark SQL原理解析前言: [Spark SQL原始碼剖析(一)SQL解析框架Catalyst流程概述](https://www.cnblogs.com/listenfwind/p/12724381.html) [Spark SQL原始碼解析(二)Antlr4解析Sql並生成樹](https://www.cnblogs.com/listenfwind/p/12735833.html) [Spark SQL原始碼解析(三)Analysis階段分析](https://www.cnblogs.com/listenfwind/p/12795934.html) [Spark SQL原始碼解析(四)Optimization和Physical Planning階段解析](https://www.cnblogs.com/listenfwind/p/12886205.html) # SparkPlan準備階段介紹 前面經過千辛萬苦,終於生成可實際執行的SparkPlan(即PhysicalPlan)。但在真正執行前,還需要做一些準備工作,包括在必要的地方插入一些shuffle作業,在需要的地方進行資料格式轉換等等。 這部分內容都在org.apache.spark.sql.execution.QueryExecution類中。我們看看程式碼 ``` class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { ......其他程式碼 lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) //呼叫下面的preparations,然後使用foldLeft遍歷preparations中的Rule並應用到SparkPlan protected def prepareForExecution(plan: SparkPlan): SparkPlan = { preparations.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } } /** A sequence of rules that will be applied in order to the physical plan before execution. */ //定義各個Rule protected def preparations: Seq[Rule[SparkPlan]] = Seq( PlanSubqueries(sparkSession), EnsureRequirements(sparkSession.sessionState.conf), CollapseCodegenStages(sparkSession.sessionState.conf), ReuseExchange(sparkSession.sessionState.conf), ReuseSubquery(sparkSession.sessionState.conf)) ......其他程式碼 } ``` 準備階段是去呼叫prepareForExecution方法,而prepareForExecution也簡單,還是我們早先看過的Rule那一套東西。定義一系列的Rule,然後讓Rule去匹配SparkPlan然後轉換一遍。 這裡在於每條Rule都是幹嘛用的,這裡介紹一下吧。 #### PlanSubqueries(sparkSession) 生成子查詢,在比較早的版本,Spark SQL還是不支援子查詢的,不過現在加上了,這條Rule其實是對子查詢的SQL新生成一個QueryExecution(就是我們一直分析的這個流程),還記得QueryExecution裡面的變數基本都是懶載入的吧,這些不會立即執行,都是到最後一併執行的,說白了就有點遞迴的意思。 #### EnsureRequirements(sparkSession.sessionState.conf) 這條是比較重要的,程式碼量也多。主要就是驗證輸出的分割槽(partition)和我們要的分割槽是不是一樣,不一樣那自然需要加入shuffle處理重分割槽,如果有排序需求還會排序。 ### CollapseCodegenStages 這個是和一個優化相關的,先介紹下相關背景。Whole stage Codegen在一些MPP資料庫被用來提高效能,主要就是將一串的運算元,轉換成一段程式碼(Spark sql轉換成java程式碼),從而提高效能。比如下圖,一串的運算元操作,可以轉換成一個java方法,這一一來效能會有一定的提升。 ![](https://img2020.cnblogs.com/blog/1011838/202004/1011838-20200424155608080-1337244720.png) 這一步就是在支援Codegen的SparkPlan上新增一個WholeStageCodegenExec,不支援Codegen的SparkPlan則會新增一個InputAdapter。這一點在下面看preparations階段結果的時候能看到,還有這個優化是預設開啟的。 #### ReuseExchange和ReuseSubquery 這兩個都是大概同樣的功能就放一塊說了。首先Exchange是對shuffle如何進行的描述,可以理解為就是shuffle吧。 這裡的ReuseExchange是一個優化措施,去找有重複的Exchange的地方,然後將結果替換過去,避免重複計算。 ReuseSubquery也是同樣的道理,如果一條SQL語句中有多個相同的子查詢,那麼是不會重複計算的,會將計算的結果直接替換到重複的子查詢中去,提高效能。 這裡我略過了CollapseCodegenStages,這部分比較複雜,也沒什麼時間看,就先跳過了,大概知道這個東西是一個優化措施就行了。 那再來看看這一階段後,示例程式碼會變成什麼樣吧,先看示例程式碼: ``` //生成DataFrame val df = Seq((1, 1)).toDF("key", "value") df.createOrReplaceTempView("src") //呼叫spark.sql val queryCaseWhen = sql("select key from src ") ``` 結果生成如下: ``` Project [_1#2 AS key#5] +- LocalTableScan [_1#2, _2#3] ``` 好吧這裡看還是和之前Optimation階段一樣,不過斷點看就不大一樣了。 ![](https://img2020.cnblogs.com/blog/1011838/202004/1011838-20200424153815248-1329999930.png) 由於我們的SQL比較簡單,所以只多了兩個SparkPlan,就是WholeStageCodegenExec和InputAdapter,和上面說的是一致的! OK,經過以上的準備之後,就要開始最後的執行階段了。 # SparkPlan執行生成RDD階段 依舊是在QueryExecution裡面, ``` class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { ......其他程式碼 lazy val toRdd: RDD[InternalRow] = executedPlan.execute() ......其他程式碼 } ``` 這裡實際上是呼叫了之前生成的SparkPlan的execute()方法,這個方法最終會再呼叫它的doExecute()方法,而這個方法是各個子類自己實現的,也就是說,不同的SparkPlan執行的doExecute()是不一樣的。 通過上面的階段,我們得到了一棵4層的樹,不過其中WholeStageCodegenExec和InputAdapter是為Codegen優化生成的,這裡就不討論了,忽略這兩個其實結果是一樣的。也就是說這裡只介紹ProjectExec和LocalTableScanExec兩個SparkPlan的doExecute()方法。 先是ProjectExec這個SparkPlan,我們看看它的doExecute()程式碼。 ``` case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) extends UnaryExecNode with CodegenSupport { ......其他程式碼 protected override def doExecute(): RDD[InternalRow] = { child.execute().mapPartitionsWithIndexInternal { (index, iter) => val project = UnsafeProjection.create(projectList, child.output, subexpressionEliminationEnabled) project.initialize(index) iter.map(project) } } ......其他程式碼 } ``` 可以看到它是先遞迴去呼叫child(也就是LocalTableScanExec)的doExecute()方法,還是得先去看看LocalTableScanExec生成什麼東西呀。 ``` case class LocalTableScanExec( output: Seq[Attribute], @transient rows: Seq[InternalRow]) extends LeafExecNode { ......其他程式碼 private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows, numParallelism) protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") rdd.map { r => numOutputRows += 1 r } } ......其他程式碼 ``` 可以看到最底層的rdd就是在這裡實現的,LocalTableScanExec一開始就會生成一個lazy的rdd,在需要的時候返回。而在doExecute()方法中的numOutputRows可以理解為僅是一個測量值,暫時不用理會。總之這裡我們就發現LocalTableScanExec的doExecute()其實就是返回一個parallelize生成的rdd。然後再回到ProjectExec去。 它呼叫child.execute().mapPartitionsWithIndexInternal {......},這裡的mapPartitionsWithIndexInternal和rdd的mapPartitionsWithIndex是類似的,區別只在於mapPartitionsWithIndexInternal只會在內部模組使用,如果有童鞋不明白mapPartitionsWithIndex這個API,可以百度查檢視。然後重點看mapPartitionsWithIndexInternal的內部邏輯。 ``` child.execute().mapPartitionsWithIndexInternal { (index, iter) => val project = UnsafeProjection.create(projectList, child.output, subexpressionEliminationEnabled) project.initialize(index) iter.map(project) } ``` 這裡最後一行iter.map(project),其實還是scala的語法糖,實際大概是這樣iter.map(i => project.apply(i))。就是呼叫project的apply方法,對每行資料處理。然後通過追蹤,可以發現project的例項是InterpretedUnsafeProjection,我們看看它的apply方法。 ``` class InterpretedUnsafeProjection(expressions: Array[Expression]) extends UnsafeProjection { ......其他程式碼 override def apply(row: InternalRow): UnsafeRow = { // Put the expression results in the intermediate row. var i = 0 while (i < numFields) { values(i) = expressions(i).eval(row) i += 1 } // Write the intermediate row to an unsafe row. rowWriter.reset() writer(intermediate) rowWriter.getRow() } ......其他程式碼 ``` 這裡其實重點在最後三行,就是將結果寫入到result row,再返回回去。當執行完畢的時候,就會得到最終的RDD[InternalRow],再剩下的,就交給spark core去處理了。 ### 小結 OK,那到這裡基本就把Spark整個流程給講完了,回顧一下整個流程。 ![catalyst流程](https://img2020.cnblogs.com/blog/1011838/202004/1011838-20200418114451800-50235194.png) 其實可以發現流程是挺簡單的,很多其他SQL解析框架(比如calcite)也是類似的流程,只是在設計上在某些方面的取捨會有偏差。而後深入到程式碼的時候容易陷入一些細節中,當然這幾篇也省略了很多細節,很多時候細節才是真正精髓的地方,以後有如果涉及到的時候再寫文章討論吧(/偷笑)。如果在開放過程中涉及到SQL解析這方面的開放,應該都會是在優化方面,也就是Optimization階段增加或處理Rule,這塊就需要對代數優化理論和程式碼有一些瞭解了。 限於本人水平,介紹spark sql的這幾篇文章難免有疏漏和不足的地方,歡迎在評論區評論,先謝過了~~