1. 程式人生 > >spark的shuffle機制

spark的shuffle機制

對於大資料計算框架而言,Shuffle階段的設計優劣是決定效能好壞的關鍵因素之一。本文將介紹目前Spark的shuffle實現,並將之與MapReduce進行簡單對比。本文的介紹順序是:shuffle基本概念,MapReduce Shuffle發展史以及Spark Shuffle發展史。

 

(1)  shuffle基本概念與常見實現方式

shuffle,是一個運算元,表達的是多對多的依賴關係,在類MapReduce計算框架中,是連線Map階段和Reduce階段的紐帶,即每個Reduce Task從每個Map Task產生數的據中讀取一片資料,極限情況下可能觸發M*R個數據拷貝通道(M是Map Task數目,R是Reduce Task數目)。通常shuffle分為兩部分:Map階段的資料準備和Reduce階段的資料拷貝。首先,Map階段需根據Reduce階段的Task數量決定每個Map Task輸出的資料分片數目,有多種方式存放這些資料分片:

1) 儲存在記憶體中或者磁碟上(Spark和MapReduce都存放在磁碟上);

2) 每個分片一個檔案(現在Spark採用的方式,若干年前MapReduce採用的方式),或者所有分片放到一個數據檔案中,外加一個索引檔案記錄每個分片在資料檔案中的偏移量(現在MapReduce採用的方式)。

在Map端,不同的資料存放方式各有優缺點和適用場景。一般而言,shuffle在Map端的資料要儲存到磁碟上,以防止容錯觸發重算帶來的龐大開銷(如果儲存到Reduce端記憶體中,一旦Reduce Task掛掉了,所有Map Task需要重算)。但資料在磁碟上存放方式有多種可選方案,在MapReduce前期設計中,採用了現在Spark的方案(目前一直在改進),每個Map Task為每個Reduce Task產生一個檔案,該檔案只儲存特定Reduce Task需處理的資料,這樣會產生M*R個檔案,如果M和R非常龐大,比如均為1000,則會產生100w個檔案,產生和讀取這些檔案會產生大量的隨機IO,效率非常低下。解決這個問題的一種直觀方法是減少檔案數目,常用的方法有:1) 將一個節點上所有Map產生的檔案合併成一個大檔案(MapReduce現在採用的方案),2) 每個節點產生{(slot數目)*R}個檔案(Spark優化後的方案)。對後面這種方案簡單解釋一下:不管是MapReduce 1.0還是Spark,每個節點的資源會被抽象成若干個slot,由於一個Task佔用一個slot,因此slot數目可看成是最多同時執行的Task數目。如果一個Job的Task數目非常多,限於slot數目有限,可能需要執行若干輪。這樣,只需要由第一輪產生{(slot數目)*R}個檔案,後續幾輪產生的資料追加到這些檔案末尾即可。因此,後一種方案可減少大作業產生的檔案數目。

在Reduce端,各個Task會併發啟動多個執行緒同時從多個Map Task端拉取資料。由於Reduce階段的主要任務是對資料進行按組規約。也就是說,需要將資料分成若干組,以便以組為單位進行處理。大家知道,分組的方式非常多,常見的有:Map/HashTable(key相同的,放到同一個value list中)和Sort(按key進行排序,key相同的一組,經排序後會挨在一起),這兩種方式各有優缺點,第一種複雜度低,效率高,但是需要將資料全部放到記憶體中,第二種方案複雜度高,但能夠藉助磁碟(外部排序)處理龐大的資料集。Spark前期採用了第一種方案,而在最新的版本中加入了第二種方案, MapReduce則從一開始就選用了基於sort的方案。

(2) MapReduce Shuffle發展史

【階段1】:MapReduce Shuffle的發展也並不是一馬平川的,剛開始(0.10.0版本之前)採用了“每個Map Task產生R個檔案”的方案,前面提到,該方案會產生大量的隨機讀寫IO,對於大資料處理而言,非常不利。

【階段2】:為了避免Map Task產生大量檔案,HADOOP-331嘗試對該方案進行優化,優化方法:為每個Map Task提供一個環形buffer,一旦buffer滿了後,則將記憶體資料spill到磁碟上(外加一個索引檔案,儲存每個partition的偏移量),最終合併產生的這些spill檔案,同時建立一個索引檔案,儲存每個partition的偏移量。

(階段2):這個階段並沒有對shuffle架構做調成,只是對shuffle的環形buffer進行了優化。在Hadoop 2.0版本之前,對MapReduce作業進行引數調優時,Map階段的buffer調優非常複雜的,涉及到多個引數,這是由於buffer被切分成兩部分使用:一部分儲存索引(比如parition、key和value偏移量和長度),一部分儲存實際的資料,這兩段buffer均會影響spill檔案數目,因此,需要根據資料特點對多個引數進行調優,非常繁瑣。而MAPREDUCE-64則解決了該問題,該方案讓索引和資料共享一個環形緩衝區,不再將其分成兩部分獨立使用,這樣只需設定一個引數控制spill頻率。

【階段3(進行中)】:目前shuffle被當做一個子階段被嵌到Reduce階段中的。由於MapReduce模型中,Map Task和Reduce Task可以同時執行,因此一個作業前期啟動的Reduce Task將一直處於shuffle階段,直到所有Map Task執行完成,而在這個過程中,Reduce Task佔用著資源,但這部分資源利用率非常低,基本上只使用了IO資源。為了提高資源利用率,一種非常好的方法是將shuffle從Reduce階段中獨立處理,變成一個獨立的階段/服務,由專門的shuffler service負責資料拷貝,目前百度已經實現了該功能(準備開源?),且收益明顯,具體參考:MAPREDUCE-2354

(3) Spark Shuffle發展史

目前看來,Spark Shuffle的發展史與MapReduce發展史非常類似。初期Spark在Map階段採用了“每個Map Task產生R個檔案”的方法,在Reduce階段採用了map分組方法,但隨Spark變得流行,使用者逐漸發現這種方案在處理大資料時存在嚴重瓶頸問題,因此嘗試對Spark進行優化和改進,相關連結有:External Sorting for Aggregator and CoGroupedRDDs,“Optimizing Shuffle Performance in Spark”,“Consolidating Shuffle Files in Spark”,優化動機和思路與MapReduce非常類似。

Spark在前期設計中過多依賴於記憶體,使得一些執行在MapReduce之上的大作業難以直接執行在Spark之上(可能遇到OOM問題)。目前Spark在處理大資料集方面尚不完善,使用者需根據作業特點選擇性的將一部分作業遷移到Spark上,而不是整體遷移。隨著Spark的完善,很多內部關鍵模組的設計思路將變得與MapReduce升級版Tez非常類似。