Apache Spark作為編譯器:深入介紹新的Tungsten執行引擎
《Spark 2.0技術預覽:更容易、更快速、更智慧》文中簡單地介紹了Spark 2.0相關技術, 本文將深入介紹新的Tungsten執行引擎。Apache Spark已經非常快了,但是我們能不能讓它再快10倍?
這個問題使得我們從根本上重新思考Spark物理執行層的設計。當你隨便調查一個現代資料引擎(比如Spark、其他的MPP資料庫),你會發現大部分的CPU週期都花費在無用的工作之上,比如虛擬函式的呼叫;或者讀取/寫入中間資料到CPU快取記憶體或記憶體中。通過減少花在這些無用功的CPU週期一直是現代編譯器長期效能優化的重點。
Apache Spark 2.0中附帶了第二代Tungsten engine。這一代引擎是建立在現代編譯器和MPP資料庫的想法上,並且把它們應用於資料的處理過程中。主要想法是通過在執行期間優化那些拖慢整個查詢的程式碼到一個單獨的函式中,消除虛擬函式的呼叫以及利用CPU暫存器來存放那些中間資料。作為這種流線型策略的結果,我們顯著提高CPU效率並且獲得了效能提升,我們把這些技術統稱為”整段程式碼生成”(whole-stage code generation)。
過去:Volcano迭代模型(Volcano Iterator Model)
在我們深入介紹whole-stage code generation之前,讓我們先回顧一下現在的Spark(以及大多數資料庫系統)是如何執行的。讓我們看看這個簡單的查詢:掃描一個表,然後計算出滿足給定條件屬性值的總行數。
為了計算這個查詢,舊版本的Spark(1.x)會利用基於迭代模型的經典查詢評估策略(通常被稱為Volcano model)。在這個模型中,一個查詢由多個運算元(operators)組成,每個運算元都提供了next()介面,該介面每次只返回一個元組(tuple)給巢狀樹中的下一個運算元。比如上面查詢中的Filter運算元大致可以翻譯成下面的程式碼:
class Filter(child: Operator, predicate: (Row => Boolean))
extends Operator {
def next(): Row = {
var current = child.next()
while (current == null || predicate(current)) {
current = child.next()
}
return current
}
}
讓每個運算元實現迭代器介面允許查詢執行引擎來優雅地組合任意的運算元,而不必擔心每個運算元提供的資料型別。結果Volcano模型在過去的二十年間變成資料庫系統的標準,這個也是Spark使用的架構。
Volcano與手寫程式碼
如果我們給一個大學新生十分鐘的時間使用Java來實現上面的查詢。他很可能會想出一段迭代程式碼來迴圈遍歷輸入,判斷條件並計算行數,如下所示:
var count = 0
for (ss_item_sk in store_sales) {
if (ss_item_sk == 1000) {
count += 1
}
}
上面編寫的程式碼僅僅是專門解決一個給定的查詢,而且很明顯不能和其他運算元進行組合。但是這兩種實現(Volcano與手寫程式碼)方式在效能上有啥重要區別呢?一方面Spark和大多數關係型資料庫選擇這種可以對不同運算元進行組合的結構;另一方面,我們有一個由新手在十分鐘編寫的程式。我們運行了一個簡單的基準測試,對比了”大學新生”版的程式和Spark版的程式在使用單個執行緒的情況下執行上面同一份查詢,並且這些資料儲存在磁碟上,格式為Parquet。下面是它們之間的對比:
正如你所看到的,大學新生手寫版本的程式要比Volcano模式的程式要快一個數量級!事實證明,6行的Java程式碼是被優化過的,其原因如下:
- 沒有虛擬函式呼叫:在Volcano模型中,處理一個元組(tuple)最少需要呼叫一次next()函式。這些函式的呼叫是由編譯器通過虛擬函式排程實現的(通過vtable);而手寫版本的程式碼沒有一個函式呼叫。雖然虛擬函式排程是現代計算機體系結構中重點優化部分,它仍然需要消耗很多CPU指令而且相當的慢,特別是排程數十億次。
- 記憶體和CPU暫存器中的臨時資料:在Volcano模型中,每次一個運算元給另外一個運算元傳遞元組的時候,都需要將這個元組存放在記憶體中;而在手寫版本的程式碼中,編譯器(這個例子中是JVM JIT)實際上將臨時資料存放在CPU暫存器中。訪問記憶體中的資料所需要的CPU時間比直接訪問在暫存器中的資料要大一個數量級!
- 迴圈展開(Loop unrolling)和SIMD:當執行簡單的迴圈時,現代編譯器和CPU是令人難以置信的高效。編譯器會自動展開簡單的迴圈,甚至在每個CPU指令中產生SIMD指令來處理多個元組。CPU的特性,比如管道(pipelining)、預取(prefetching)以及指令重排序(instruction reordering)使得執行簡單的迴圈非常地高效。然而這些編譯器和CPU對複雜函式呼叫圖的優化極少,而這些函式正是Volcano模型依賴的。
這裡面的關鍵點是手寫版本程式碼的編寫是正對上面的查詢,所以它充分利用到已知的所有資訊,導致消除了虛擬函式的呼叫,將臨時資料存放在CPU暫存器中,並且可以通過底層硬體進行優化。
未來:整段程式碼生成
從上面的觀察我們下一步的探討自然是在執行時自動生成手寫版的程式碼的可能性,這些技術統稱為整段程式碼生成(whole-stage code generation)。這個想法是受Thomas Neumann發表在VLDB 2011上的論文Efficiently Compiling Efficient Query Plans for Modern Hardware所啟發。
我們的目標是使用整段程式碼生成使得Spark計算引擎可以實現手寫程式碼的效能,並且提供通用的功能。而不是在執行時依賴運算元來處理資料,這些運算元在執行時生成程式碼,如果可能的話將所有的查詢片段組成到單個函式中,後面我們僅需要執行生成的程式碼。
比如對於上面的查詢可以作為單個階段,Spark可以產生以下的JVM位元組碼(這裡展示的是Java程式碼)。複雜的查詢將會產生多個階段,這種情況下Spark將會產生多個不同的函式。
下面表示式中的explain()函式為整段程式碼生成進行了擴充套件。在輸出的結果裡,當運算元前面有一個*,說明整段程式碼生成被啟用。在下面情況下,Range、Filter和兩個Aggregates運算元都啟用了整段程式碼生成。然而Exchange運算元並沒有實現整段程式碼生成,因為它需要通過網路傳送資料。
spark.range(1000).filter("id > 100").selectExpr("sum(id)").explain()
== Physical Plan ==
*Aggregate(functions=[sum(id#201L)])
+- Exchange SinglePartition, None
+- *Aggregate(functions=[sum(id#201L)])
+- *Filter (id#201L > 100)
+- *Range 0, 1, 3, 1000, [id#201L]
對於這些緊密關注Spark開發的人可能會問:我在Apache Spark 1.1中就聽說過程式碼生成(code generation),它和這裡講的有啥區別?在過去和其他MPP查詢引擎一樣,Spark僅僅將程式碼生成應用於表示式求值(expression evaluation),而且僅限於很小一部分的運算元(比如Project, Filter)。也就是說,之前的程式碼生成技術僅僅是加快了表示式的求值(比如1+a);然而今天的整段程式碼生成技術實際上為整個查詢計劃生成程式碼。
Vectorization
Whole-stage code-generation技術對那些在大型資料集根據條件過濾的大規模簡單查詢非常有效,但還是存在那些無法生成程式碼將整個查詢融合到一個函式的情況。有些運算元可能非常地複雜(比如CSV解析或者Parquet解碼),或者有些情況下我們會整合第三方元件,而這些元件的程式碼是無法整合到我們生成的程式碼之中。
為了提高這些情況下的效能,我們提出一個新的方法叫做向量化(vectorization)。核心思想是:我們不是一次只處理一行資料,而是將許多行的資料分別組成batches,而且採用列式格式儲存;然後每個運算元對每個batch進行簡單的迴圈來遍歷其中的資料。所以每次呼叫next()函式都會返回一批的元組,這樣可以分攤虛擬函式呼叫的開銷。採用了這些措施之後,這些簡單的迴圈也會使得編譯器和CPU執行的更加高效。
假設有一個表有三列 (id, name, score),下面展示了面向行格式和麵向列格式的記憶體佈局:
這種風格的處理可以實現上面提到三點中的兩點(也兩點分別是:幾乎沒有虛擬函式呼叫以及自動展開迴圈/SIMD),這種風格仍然需要將臨時資料存放在記憶體中而不是存放在CPU的暫存器中。所以我們只有在無法使用Whole-stage code-generation技術的情況下才會使用vectorization技術。
比如我們已經實現了一個新的向量Parquet解碼器(vectorized Parquet reader)可以批量解碼和解壓。當解碼存在磁碟上的integer型別的列,新的解碼器大概是舊的解碼器的9倍!
效能基準測試
為了有個直觀的感受,我們記錄下在Spark 1.6和Spark 2.0中在一個核上處理一行的操作時間(單位是納秒),下面的表格能夠體現出新的Tungsten engine的威力。Spark 1.6使用的表示式程式碼生成技術同樣在今天的其他商業資料庫中採用。
我們已經調查了客戶的workloads,並且對那些使用非常頻繁的運算元實現了whole-stage code generation 技術,這些運算元包括:filter, aggregate以及hash join。正如你在上表看到的,許多核心的運算元在採用whole-stage code generation 技術之後的效能已經提高了一個數量級!然而有些運算元(比如sort-merge join)天生就很慢,所以非常難對其進行優化。
你可以在這裡看到whole-stage code generation技術的威力,在那裡我們在一臺機器上對10億條記錄進行aggregations和joins操作。不管是在Databricks platform(Intel Haswell處理器,3核)還是在一臺2013年出售的Macbook Pro(Intel Haswell i7)電腦上,對10億條元組進行hash join操作採用的時間不到1秒!
在端到端查詢這個新引擎是如何工作的?除了whole-stage code generation和vectorization技術,對一般的查詢我們在Catalyst optimizer上也進行了很多其他的工作,比如nullability propagation。我們比較了Spark 1.6和Spark 2.0在使用TPC-DS查詢的基本分析,如下圖:
那是不是意味著你把Spark升級Spark 2.0,所以的workload將會變的比之前快10倍呢?其實不是,雖然我們相信新的Tungsten引擎為我們的效能優化實現了最好的資料處理架構;但是我們需要理解的是,不是所有的workload都能享受到同樣的程度。比如可變長度的資料型別(如字串)對其操作本身就非常昂貴;還有些workloads受其他因素影響,比如I/O吞吐量以及元資料操作等。對於之前那些受CPU效率影響的Workloads將會獲得最大的效率。
結論
本文提到的絕大部分工作已經提交到Apache Spark的程式碼中,並且將會在Spark 2.0版本釋出。關於whole-stage code generation技術的JIRA可以到SPARK-12795裡檢視;而關於vectorization技術的JIRA可以到SPARK-12992檢視。
總結一下,本文主要描述了第二代Tungsten執行引擎。通過whole-stage code generation技術,這個引擎可以(1)、消除虛擬函式呼叫;(2)、將臨時資料從記憶體中移到CPU暫存器中;(3)、利用現代CPU特性來展開迴圈並使用SIMD功能。通過vectorization技術,可以加快那些程式碼生成比較複雜的運算元執行速度。對於資料處理中很多核心運算元,新的引擎會使它們的執行速度提升一個數量級。在未來,考慮到執行引擎的效率,我們大部分的優化工作將會轉移到優化I/O效率以及更好的查詢計劃。