1. 程式人生 > >Spark的下一代引擎-Project Tungsten啟示錄:兼Presto、impala、spark效能根本比較

Spark的下一代引擎-Project Tungsten啟示錄:兼Presto、impala、spark效能根本比較

引自:http://blog.csdn.net/ytbigdata/article/details/50721174

        在過去的一年之中,我們一直在利用Spark做實時互動式分析系統方面的嘗試,有興趣的同學可以看一下我們之前分享的部落格《基於Spark的使用者分析系統》。我們在不斷受到Spark啟發的同時,也不得不忍受尚處於青春期的Spark性格中的叛逆。特別是在不斷優化系統性能過程中,發現我們實際上是在做與Project Tungsten同樣的工作。不知道是該慶幸選對了方向,還是該憂傷重複發明了輪子。尤其是在對比了Project Tungsten與我們自己的實現,心中五味雜陳。不過也正是由於重複發明輪子的過程,也讓我們對Project Tungsten有了自己的理解,所以在這裡聊一下Project Tungsten背後的黑科技。

1. Project Tungsten的野心

        如果你對ProjectTungsten還比較陌生,建議大家看一下這篇部落格《Project Tungsten: Bringing Spark Closer to Bare Metal》。在這篇部落格中,作者指出ProjectTungsten是為了大幅提升Spark應用使用CPU和Memory的效率,讓Spark的效能接近硬體的極限。聽著就振奮人心,卻不禁引起大家對於Spark效能的疑惑,難道我們看到的比hadoop快幾十到幾百倍的效率還不是效能的極限嗎?答案是遠遠沒到,實際上目前Spark享受的福利還僅僅是將資料放在了記憶體中,相對於很多其他的框架例如Apache Drill以及Lucene,Spark在調優方面做得工作只能說是剛剛開始。說了這麼多,我們首先來分析一下Project Tungsten在搗鼓些什麼:

  • 記憶體管理與二進位制處理(Memory Management and Binary Processing)
  • Cache-aware計算(Cache-awareComputation)
  • 程式碼生成(Code Generation)

看著這三個方面,你可能有很多的疑惑,這完全沒有一個主線啊,DataBricks真是不按常理出牌!可是如果你曾經試圖利用Java寫一個數據庫,或者其他資料密集型的應用時,你會發現這三個方向幾乎是你必然的選擇。

1.1 記憶體管理與二進位制處理

        首先聊一下記憶體管理以及二進位制處理,在相當多的場景中IO是我們程式永恆的瓶頸,我們總是試圖做批處理,基於列儲存,分割槽甚至是倒排索引,這一切的努力都是在解決磁碟的IO瓶頸。但是如果資料完全放入了記憶體之後,我們面臨的新問題是什麼呢?CPU不夠用。其實我們在做實時互動式分析系統的時候就發現了這個問題,我們從來沒有如此希望增加CPU的核數,但是叢集告訴我們CPU的核數是有限的。那麼問題出現在哪兒?其實是我們沒有把CPU的資源用在刀刃上。以普通的DDR 3 1666MHz的為例,理論值能夠達到10GB/s的讀取能力,實際值大概在5GB/s附近,相對於普通機械硬碟30MB/s的連續讀取能力,我們可以想象一下,相對於從磁碟讀取,從記憶體直接讀取資料對CPU的計算能力訴求有多大。如果再考慮多通道的問題,這個資料量將會按倍數增加。面對這麼多的資料,我們可憐的CPU要做哪些工作呢?第一個繁重的工作就是序列化與反序列化,以Java為例這個過程說白了就是完成一堆的物件和一堆二級制資料之間的相互轉化。那麼為什麼Spark需要序列化和反序列化呢,原因很簡單就是為了溝通,為了能夠將資料從一個例項搬到另外一個例項,這是不是讓你想到shuffle的過程。第二個繁重的工作是建立物件,你可能會反駁JVM new一個物件是多麼得有效率,而實際上當資料像潮水般湧來,讓CPU把他們都包裝成一個個的物件寫回到記憶體中,這個建立的時間就不能忽略了。不僅如此,建立過多的物件帶來的是大量的記憶體消耗和GC時間,我們都知道GC也是要消耗CPU時間片的。有時候建立了物件還不是一切的終點,我們需要構建一個有效的資料結構,比如HashMap或者HashSet,構建這些資料結構的時候,大量的比較或者hash值計算都是CPU的天敵。所以大家可能會體會到RDD的join操作是多麼得讓人著急。第三個工作可能是不停地壓縮資料和解壓縮資料,如果選擇了對序列化後的資料進行壓縮編碼處理,一旦遇到這種密集型的計算訴求,CPU就會成為絕對的效能瓶頸。

1.2 Cache-aware

        Cache-aware並不是一個新的概念,在上個世紀90年代的時候就有學者在這些方面做過很多的研究,有興趣的朋友可能看看Cache-aware(《An Overview of Cache Optimization Techniques and Cache{Aware Numerical Algorithms》)以及Cache-oblivious(《Cache-Oblivious Algorithms and Data Structures》)相關的演算法方面的研究。單純從名字就可以看出,這個Cache-aware就是要讓大家牢記CPU是有一級和二級快取的,CPU在讀取記憶體資料的時候不是一個位元組一個位元組地讀取的,而是按照cache line進行讀取,也就是我們印象中CPU依次讀取8個位元組。更進一步我們是不是還朦朧的記得現代CPU的向量計算,也就是SIMD Programming,有興趣的同學可以看一下《Basics of SIMD Programming》。如果每次CPU都是在隨機的地址讀取資料,由於CPU的頻率比記憶體的頻率高很多,就會造成CPU長時間處於等待的狀態。那麼我們的Spark是不是都在利用這些特性呢?事實上Spark是構建在JVM上的,JVM基本上都不理會以上介紹的這些CPU技能(當然JIT背後的黑魔法有可能會應用上這些特性,筆者確實沒有深入研究過),在一些特殊情況下,比如一個String型別的資料進行排序的時候,實際上首先傳入CPU的是物件的引用,在進行比較是需要重新找到對應的位元組,這是一個非常耗費CPU資源的操作,產生的cache miss會造成程式慢上數倍。此外由於GC的存在,即使是一開始連續分配的記憶體,也有可能在隨後GC過程中被徹底打散,造成CPU的隨機訪問記憶體。如果你需要強行用上CPU這些奧義,那麼你就需要打怪升級了。

1.3 程式碼生成

        這是一個很有意思的領域,作為一個程式設計師如果瞭解瞭解釋器、執行計劃以及程式碼生成,你就會發現你下一步想要做的事就是git clone一下mysql的原始碼了,高階的同學們就會考慮寫一個自己喜歡的語言了。實際上如果你閱讀了spark-catalyst以及DataFrame的實現之後,你會發現他們就是在搞一個記憶體版的基於列資料庫。回到優化這個主題上來,為什麼要進行程式碼生成,不知道各位有沒有寫過一個sql解析的程式,就是那個典型的visitor模式。你會發現在進行了AST語義解析之後你會生成一個簡單的邏輯執行計劃,而這個執行計劃其實就是一個嵌套了各類操作的方法。實際上這個方法是可以進行直接執行的,但是你會發現這個程式碼慢的有點驚人。舉一個簡單的例子,如果用AST表達a+b/c+d*e,你會得到一個三層巢狀的邏輯樹,在完成賦值操作(ValueNode)後,首先計算b/c(BinaryNode)、d*e(BinaryNode),然後計算a+(b/c)(BinaryNode),然後再計算(a+b/c)+d*e(BinaryNode),邏輯看著都費勁,這種pop和push棧的操作肯定也是快不起來。有沒有一種方法能夠生成一個更加高效的執行程式碼呢?答案就是在優化階段生成一個以下的函式:

  1. publicint fun(int a,int b, int c ,intd,int e){  
  2.       return a+b/c+d*e;  
  3. }  

        這個程式碼裡面把那些煩人的BinaryNode巢狀替換成了普通的程式碼,然後利用編譯器編譯一下上面的程式碼,這時候你就會發現一切都變得美好了。道理就是這麼簡單,但是實現過程中可能需要處理SQL協議,執行計劃優化等問題,關於SQL解析大家可以看一下《SQL解析過程詳解》。生成動態程式碼用什麼編譯器會比較快一點,比如大家耳熟能詳的LLVM(C++)/JavaCompiler(Java)/Janino(Java)。不少框架為了統一介面,採用JSON的互動方式,比如ElasticSearch,但實際上原理都是一樣的。Spark使用了Janino作為程式碼生成的預設Compiler,其實在編譯器上也沒有太多的選擇餘地。

        介紹完Project Tungsten的三個方向之後,你會發現一條清晰的主線,那就是構造一個更加有效地分析引擎,而且這個分析引擎的大部分優化靈感都來自於關係資料庫。與此同時我們體會到Spark的無奈,Project Tungsten其實是為了Spark選擇Scala而買單。

2. Spark與JVM的緣起緣滅

    在講ProjectTungsten之前其實應該聊聊Spark的實現,眾所周知,Spark選擇了Scala作為實現語言。與其說選擇Scala是Spark的敗筆,還不如說是一種妥協。從語言性質上說,Scala是一個非常易於使用的語言,特別是擁有.map().filter().reduce()這樣優雅的介面,當你利用Spark寫一個mapreduce程式的時候,你會覺得這才是我想要的語言。但是如果你在閱讀akka的原始碼時,你可能就會因為晦澀的程式碼而叫苦不迭。因此Scala是一個為API設計的語言,與此同時為了能夠複用Java的開源元件,Scala又選擇了JVM,於是Spark的調優就與JVM結下了不解之緣。其實選擇JVM並不是一個壞選擇,在多數情況下程式會執行地很好。但是當Spark遇到shuffle以及DataFrame時,這就是JVM不太擅長的領域了。FULL GC的長時間停頓將會嚴重影響一個SQL的執行時間,如果這個系統又是一個分散式的系統,你無法控制讓所有的例項都保持相同的GC頻率,這個時候根據木桶定律,這個SQL的執行時間取決於執行最慢的那個節點。實際上每一次執行過程中都需要面對GC的問題,根據我們的經驗,有時候在進行大規模的資料聚合操作(aggregation)時,GC的時間甚至要比執行的時間多2到3倍。這個時候我們會發現在利用Spark做一個秒級響應的分散式互動分析系統是多麼地困難。當然為了解決最短的一塊木板的問題,我們可以採取speculation的方式緩解,然而這種方式沒有觸動一個核心的問題:我們能不能控制GC的回收策略?比如我們需要複用的資料是不需要GC的,臨時生成的物件是不需要放到Old Generation的,我們自己或者是Spark比JVM更加了解資料的特性。事實上JVM沒有提供方便的介面來實現這些,為了能夠稍微控制一下GC的命脈,唯一的救命稻草就是sun.misc.Unsafe。據說Oracle正在討論在Java 9 中刪除Unsafe類,不知道Oracle的這種執著會造成大量的框架開發者投向Golang的懷抱。實際上在Project Tungsten之前Spark已經利用Netty改造了網路層傳輸問題,而Netty用了多少unsafe的程式碼,各位同學可以自己研究一下。以上就是JVM與Spark的前世今生,不僅如此,RDD有很多的transform操作比如map(),filter(), join()實際上是在重新生成一個容器,可想而知當資料量非常大的時候,JVM要建立多少的物件,同時這些物件又有多少會進入到Old Generation。有的同學可能會想到複用物件,那麼複用物件是不是最終的解決方法呢?答案是看情況,比如你只是做一個SQL的查詢操作,複用是一個非常好的思路;如果你是在做一個分類器訓練的時候,也許transform後的資料會成為下面幾步迭代的輸入資料,生成一個新的Array並且cache()一下會讓一切變得更快。

3. Project Tungsten的葵花寶典

    這一回我們要回到枯燥的程式碼上來了,這一章還是儘量多講大道理,少擺程式碼,畢竟程式碼都在那兒,大家有時間可以自己看一遍。道理講明白了,解決方案也都放在哪裡,剩下的事情就變得相對簡單了。我們可以來看一下在過去的一年中Project Tungsten針對這些問題修煉了哪些神功。

3.1 二進位制處理

       ok,我們首先翻開ProjectTungsten的第一章,欲練此功,必先自宮。這裡是不是要拋棄JVM呢?當然如果你有的是時間,並且能夠說服所有的Java程式設計師接受delete,我覺得放棄JVM是美好的。如果無法放棄,我建議大家首先看一下spark-unsafe,你會發現所有的黑科技都在這個工程中。這個工程還只是一個嬰兒,前後加起來也不到20個類,而且邏輯都非常清晰,封裝一個好用Unsafe工具類Platform,然後抽象了一個MemoryBlock的管理連續記憶體的類,剩下的就是管理連續記憶體以及基於連續記憶體封裝的一系列資料結構。前面已經提到了CPU載入資料是以cache line為單位的,那麼什麼樣結構的資料是CPU友好型別的呢?答案就是連續記憶體,不管是堆上連續記憶體還是直接連續記憶體。其實說到這裡就非常有意思了,如果需要應用連續記憶體,我們就需要按照byte來管理我們的資料,彷彿一夜又回到了c的時代。雖然操作二進位制資料不太方便,但是帶來的好處是非常明顯的:可以愉快地擁抱序列化和反序列化了,也可以用上CPU一次載入8個位元組的技能了(cache-aware演算法),甚至可以用上SIMD向量計算了。不僅如此,你會發現壓縮演算法比例也會有提升,同時如果用上直接記憶體(Direct Memory),GC的時間也會跟著減少。剩下來的問題是怎麼從連續記憶體中讀取資料?相信大家對於mysql的表的定義非常熟悉,就是為每一個數據的欄位設定一個元資料,這樣是int型別就讀4個位元組,是long型別就讀8個位元組,是String型別的資料可以先記錄一個地址和偏移量等。這也是為什麼你在載入DataFrame資料的時候需要提供一個json格式元資料定義。筆者測試過,利用unsafe讀直接記憶體的效率與直接操作讀byte[]陣列的效率相差無幾,但是利用unsafe的setLong()來寫資料是直接寫byte[]速度的8倍。

       Project Tungsten在記憶體管理上也是有值得借鑑的地方,比如MemoryBlock就是對於連續記憶體塊的封裝,記錄了位元組長度以及引用的位置。一個數據集就可以由List<MemoryBlock>來記錄記憶體page,通過TaskMemoryManager的encodePageNumberAndOffset方法來編碼記憶體地址,是不是有點類似Linux管理記憶體的方式呢?利用MemoryBlock甚至可以寫一個CPU友好的Map,例如org.apache.spark.unsafe.map.BytesToBytesMap。Java中的HashMap實際上是由一個數組和連結串列實現的,然而這種實現將記憶體資料徹底打散,CPU的執行效率自然無法跟上。Databricks想到利用一個連續記憶體記錄hash值以及記憶體地址,同時將key和value都記錄到MemoryBlock中,這樣構建的Map雖然不具有通用的功能,但是在有些操作比如大量資料aggregation操作時,效率是非常出色的,同時加上使用直接記憶體避開了GC,想想都有點小激動。

 3.2 Cache-aware演算法

        我們可以在spark-core的org.apache.spark.util.collection.unsafe.sort包找到ProjectTungsten在Cache-aware方面做出的努力。例如RecordPointerAndKeyPrefix和UnsafeInMemorySorter這兩個類,RecordPointerAndKeyPrefix實際上儲存了一個long型別的引用和一個long型別的key prefix,這個類的物件實際上是複用的,一般是從一個連續記憶體中取出16個位元組,前8個位元組是key value在連續記憶體中的地址,而後8個自己是自定義的keyprefix,然後賦值給這個物件。這樣就可以利用key prefix做一個初步的比較操作了,而不用再去隨機查詢pointer對應的實際key value再來進行比較。當然如果發生key prefix相同,就需要比較真正的value值了。

  1. @Override
  2. publicint compare(RecordPointerAndKeyPrefix r1, RecordPointerAndKeyPrefix r2) {  
  3.   finalint prefixComparisonResult = prefixComparator.compare(r1.keyPrefix, r2.keyPrefix);  
  4.   if (prefixComparisonResult == 0) {  
  5.     final Object baseObject1 = memoryManager.getPage(r1.recordPointer);  
  6.     finallong baseOffset1 = memoryManager.getOffsetInPage(r1.recordPointer) + 4// skip length
  7.     final Object baseObject2 = memoryManager.getPage(r2.recordPointer);  
  8.     finallong baseOffset2 = memoryManager.getOffsetInPage(r2.recordPointer) + 4// skip length
  9.     return recordComparator.compare(baseObject1, baseOffset1, baseObject2, baseOffset2);  
  10.   } else {  
  11.     return prefixComparisonResult;  
  12.   }  
  13. }  

        這是一個非常巧妙的設計,對比一下String物件的排序,隨機查詢記憶體的機率大大減少,又由於所有的key都是儲存在連續記憶體中的,可以大大加速排序的過程。循著這個思路,或者一些經典的Cache-aware的演算法,利用Unsafe對於連續記憶體的操作,可以極大地提升系統的效能,相信Spark將會在這方面做更多的努力。

3.3 Code Generation

    其實如果拋開直譯器、邏輯執行計劃、物理執行計劃以及優化器來討論程式碼生成是一個管中窺豹的行為,因為能夠用上程式碼生成的地方一般都是在構造一個查詢或者分析引擎的時候。當然如果你是在寫一個類似於Spring、Hibernate或者JUnit的框架,你也會發現程式碼生成也是非常有用的工具。其實程式碼生成一般都用在程式碼不好寫或者改寫複雜的情況下,例如在有一次面試的過程中,就遇到一位候選者利用ASM的動態程式碼生成完成自動配置的工作,這是非常有趣的一個方向。說了這麼多還是希望大家仔細閱讀一下spark-catalyst的所有原始碼。

    動態程式碼生成其實網上資料非常多,介紹JavaCompiler、Janino或者是ASM框架使用和Benchmark的文章比比皆是。這裡就簡單的說說我們在實際應用過程中的體會,JavaCompiler實際上是需要通過一個原始碼的位元組流來進行編譯程式碼的,生成的class檔案會儲存到本地磁碟上,然後如果需要使用這個類構建物件或者其中的靜態方法,就需要通過ClassLoader來載入位元組碼,進而就可以利用這個動態生成的類建立物件了。Janino有一個自己的編譯器,同時也可以配置成JavaCompiler,提供的結構非常方便,但是相對於JavaCompiler,Janino自身的編譯器編譯出來的程式碼執行效率會慢10倍。ASM則是通過直接寫位元組碼來動態生成,編譯的速度非常快,但是動態程式碼的效率與Janino的效率差不多,執行速度都比較慢,特別是當這個動態程式碼需要執行上億次,差距非常明顯。如果你的程式碼只是執行一遍,我覺得ASM極快的編譯速度會讓它成為最好的選擇。

4. 總結

        看了上面的介紹,相信各類看官對於Project Tungsten或多或少都有了一定的瞭解,Project Tungsten其實不是一個非常神祕的計劃,其實它的存在是選擇JVM的資料引擎必然的優化之路。與其說Project Tungsten將鑄就下一次Spark資料引擎的基石,還不如說是長嘆一聲出身的無奈。Project Tungsten可以說是一個非常簡單,同時也還沒有成熟的實現方案,對於廣大的Java、Scala程式設計師來說是寶貴的學習材料和案例。希望這篇部落格能夠幫助大家選擇適合自身專案的優化方向,欲知下事如何,且聽下回分解。