Spark2.1和2.2 SQL物理執行策略關鍵原始碼分析
1. 文章開始之前
先附上一句SQL,使用tpc-ds的表結構,我們圍繞這句SQL講。
- SQL:
SQL> select avg(cs_ext_discount_amt) from catalog_sales, date_dim where d_date between '1999-02-22' and cast('1999-05-22' as date) and d_date_sk = cs_sold_date_sk group by cs_sold_date_sk;
- 邏輯計劃:
Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149] +- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46] +- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24) :- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46] : +- Filter isnotnull(cs_sold_date_sk#24) : +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields] +- Project [d_date_sk#58] +- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58)) +- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
2. 物理計劃原始碼分析
2.1 物理策略
def strategies: Seq[Strategy] =
extraStrategies ++ (
FileSourceStrategy ::
DataSourceStrategy ::
DDLStrategy ::
SpecialLimits ::
Aggregation ::
JoinSelection ::
InMemoryScans ::
BasicOperators :: Nil)
其中,extraStrategies是提供給外部人員可以自己新增的策略。呼叫這些strategies的程式碼如下:
// Collect physical plan candidates.
val candidates = strategies.iterator.flatMap(_(plan))
將strategies逐個去應用在邏輯計劃上,然後做flat操作,返回一個PhysicalPlan
的iterator。那麼每個策略什麼作用?
2.1.1 FileSourceStrategy
一個針對Hadoop檔案系統做的策略,當執行計劃的底層Relation是HadoopFsRelation
時會呼叫到,用來掃描檔案。
2.1.2 DataSourceStrategy
Spark針對DataSource預定義了四種scan介面,TableScan
PrunedScan
、PrunedFilteredScan
、CatalystScan
(其中CatalystScan
是unstable的,也是不常用的),如果開發者(使用者)自己實現的DataSource是實現了這四種介面之一的,在scan到執行計劃的底層Relation時,就會呼叫來掃描檔案。
2.1.3 DDLStrategy(2.2中已經消失了,2.1中有)
會在create table的時候呼叫,因為後續版本不會存在,所以不做解釋。
2.1.4 SpecialLimits
在Spark SQL中加limit n時候回撥用到(如果不指定,Spark 預設也會limit 20),在原始碼中,會給每種case的limit節點的子節點使用PlanLater
,這是個很神奇的東西下文會講到。
2.1.5 Aggregation
顧名思義,執行聚合函式的策略。
2.1.6 JoinSelection
執行join的策略。Join的執行策略也同樣分BroadcastJoin(也就是MapSideJoin),和ShuffledJoin,這個之後的文章會展開講。
2.1.7 InMemoryScans
當資料在記憶體中被快取過,就會用到該策略。
2.1.8 BasicOperators
一些基本操作的執行策略,如flatMap,sort,project等,但是實際上大都是給這些節點的子節點套上一個PlanLater
。
2.2 PlanLater
Spark SQL物理計劃裡一個非常重要的概念。字面意思很好理解,就是之後再計劃。那麼經過以上策略逐個去執行以後,原來的邏輯計劃會變成什麼樣呢?
ReturnAnswer
+- GlobalLimit 21
+- LocalLimit 21
+- PlanLater Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
, Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
+- PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
, Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
+- PlanLater Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
:- PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
, Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
: +- Filter isnotnull(cs_sold_date_sk#24)
: +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
+- PlanLater Project [d_date_sk#58]
, Project [d_date_sk#58]
+- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
+- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
有什麼差別呢?主要有二:
-
- 頂層多了個
ReturnAnswer
和Limit
節點
- 頂層多了個
-
Aggregate
、Project
、Join
節點都用了PlanLater
(其實Filter
節點也是可以用PlanLater
的,但是由於邏輯計劃已經將Filter
下推至底部,所以最底部的Project->Filter->Relation的三層節點是可以直接呼叫一個策略去執行的,因此只需要三層節點的最上層也就是Project節點使用PlanLater
即可。)
言歸正傳,語法樹頂部多了ReturnAnswer
和Limit
節點,很容易理解,Limit
是Spark SQL預設限制行數,ReturnAnswer
是將結果返回。那麼加的PlanLater有什麼作用?我的理解是,將物理計劃分割成一段段,每一段物理計劃會有其對應策略來執行。具體原始碼如下:
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...
// Collect physical plan candidates.
val candidates = strategies.iterator.flatMap(_(plan))
// The candidates may contain placeholders marked as [[planLater]],
// so try to replace them by their child plans.
val plans = candidates.flatMap { candidate =>
val placeholders = collectPlaceholders(candidate)
if (placeholders.isEmpty) {
// Take the candidate as is because it does not contain placeholders.
Iterator(candidate)
} else {
// Plan the logical plan marked as [[planLater]] and replace the placeholders.
placeholders.iterator.foldLeft(Iterator(candidate)) {
case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
// Plan the logical plan for the placeholder.
val childPlans = this.plan(logicalPlan)
candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
childPlans.map { childPlan =>
// Replace the placeholder by the child plan
candidateWithPlaceholders.transformUp {
case p if p == placeholder => childPlan
}
}
}
}
}
}
val pruned = prunePlans(plans)
assert(pruned.hasNext, s"No plan for $plan")
pruned
}
可以看到,經過策略迭代器和flat過後的candidates候選計劃們(一般來說只有一個,是最頂層的planLater),然後收集placeholder(其實就是planlater),這個時候對placeholders進行迭代,並對每個placeholder的child plan遞迴呼叫plan方法。舉例文章這句SQL,遞迴呼叫plan方法,得到每個placeholder及其child plan節點(也就是 case (candidatesWithPlaceholders, (placeholder, logicalPlan))這句話的placeholder和logicalPlan兩個變數)如下:
placeholder:
PlanLater Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
logicalPlan:
Aggregate [cs_sold_date_sk#24], [cast((avg(UnscaledValue(cs_ext_discount_amt#46)) / 100.0) as decimal(11,6)) AS avg(cs_ext_discount_amt)#149]
+- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
+- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
:- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
: +- Filter isnotnull(cs_sold_date_sk#24)
: +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
+- Project [d_date_sk#58]
+- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
+- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
placeholder:
PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
logicalPlan:
Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
+- Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
:- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
: +- Filter isnotnull(cs_sold_date_sk#24)
: +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
+- Project [d_date_sk#58]
+- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
+- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
placeholder:
PlanLater Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
logicalPlan:
Join Inner, (d_date_sk#58 = cs_sold_date_sk#24)
:- Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
: +- Filter isnotnull(cs_sold_date_sk#24)
: +- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
+- Project [d_date_sk#58]
+- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
+- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
placeholder:
PlanLater Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
logicalPlan:
Project [cs_sold_date_sk#24, cs_ext_discount_amt#46]
+- Filter isnotnull(cs_sold_date_sk#24)
+- Relation[cs_sold_date_sk#24,cs_sold_time_sk#25,cs_ship_date_sk#26,cs_bill_customer_sk#27,cs_bill_cdemo_sk#28,cs_bill_hdemo_sk#29,cs_bill_addr_sk#30,cs_ship_customer_sk#31,cs_ship_cdemo_sk#32,cs_ship_hdemo_sk#33,cs_ship_addr_sk#34,cs_call_center_sk#35,cs_catalog_page_sk#36,cs_ship_mode_sk#37,cs_warehouse_sk#38,cs_item_sk#39,cs_promo_sk#40,cs_order_number#41,cs_quantity#42,cs_wholesale_cost#43,cs_list_price#44,cs_sales_price#45,cs_ext_discount_amt#46,cs_ext_sales_price#47,... 10 more fields]
placeholder:
PlanLater Project [d_date_sk#58]
logicalPlan:
Project [d_date_sk#58]
+- Filter (((isnotnull(d_date#60) && (cast(d_date#60 as string) >= 1999-02-22)) && (d_date#60 <= 10733)) && isnotnull(d_date_sk#58))
+- Relation[d_date_sk#58,d_date_id#59,d_date#60,d_month_seq#61,d_week_seq#62,d_quarter_seq#63,d_year#64,d_dow#65,d_moy#66,d_dom#67,d_qoy#68,d_fy_year#69,d_fy_quarter_seq#70,d_fy_week_seq#71,d_day_name#72,d_quarter_name#73,d_holiday#74,d_weekend#75,d_following_holiday#76,d_first_dom#77,d_last_dom#78,d_same_day_ly#79,d_same_day_lq#80,d_current_day#81,... 4 more fields]
那麼可以看到,遞迴到最底處,就是project->filter->relation的三層節點組合,由於我實際是重寫過了DataSource,這個時候會呼叫DataSourceStrategy
,去讀取獲取資料,然後遞迴逐個返回根據每個planLater分割點會有對應的策略去對資料進行相應的操作。