Hadoop和spark為什麼要對key進行排序
1.思考
只要對hadoop
中mapreduce
的原理清楚的都熟知下面的整個流程執行原理,其中涉及到至少三次排序,分別是溢寫快速排序,溢寫歸併排序,reduce
拉取歸併排序,而且排序是預設的,即天然排序的,那麼為什麼要這麼做的,設計原因是什麼。先給個結論,為了整體更穩定,輸出滿足多數需求,前者體現在不是採用hashShuffle
而是sortShuffle
,後者體現在預計算,要知道排序後的資料,在後續資料使用時的會方便很多,比如體現索引的地方,如reduce
拉取資料時候。
2.MapReduce原理分析
在分析設計原因之前,先理解一下整個過程,在map
階段,根據預先定義的partition
規則進行分割槽,map
spill
到硬碟,每一次spill
都會在硬碟產生一個spill
檔案,因此一個map task可能會產生多個spill
檔案,其中在每次spill
的時候會對key
進行排序。接下來進入shuffle
階段,當map
寫出最後一個輸出,需要在map
端進行一次merge
操作,按照partition
和partition
內的key
進行歸併排序(合併+排序),此時每個partition
內按照key
值整體有序。然後開始第二次merge
,這次是在reduce
端,在此期間資料在記憶體和磁碟上都有,其實這個階段的merge
並不是嚴格意義上的排序,也是跟前面類似的合併+排序,只是將多個整體有序的檔案merge
MapReduce
框架的話,考慮用HashMap
輸出map內容即可。
2.1 MapTask執行機制詳解
整個流程圖如下:
詳細步驟:
-
首先,讀取資料元件
InputFormat
(預設TextInputFormat
)會通過getSplits
方法對輸⼊入⽬目錄中檔案進行邏輯切⽚片規劃得到splits
,有多少個split
就對應啟動多少個MapTask
。split
與block
的對應關係預設是⼀對⼀。 -
將輸入檔案切分為
splits
之後,由RecordReader
物件(預設LineRecordReader
)進行讀取,以\n
<key,value>
。Key
表示每⾏行行⾸首字元偏移值,value
表示這⼀行文字內容。 -
讀取
split
返回<key,value>
,進⼊入⽤使用者自己繼承的Mapper
類中,執行使用者重寫的map
函式。RecordReader
讀取⼀行這裡呼叫一次。 -
map
邏輯完之後,將map
的每條結果通過context.write
進⾏行行collect
資料收集。在collect
中,會先對其進行分割槽處理,預設使用HashPartitioner
。MapReduce
提供Partitioner
介面,它的作用就是根據key
或value
及reduce
的數量來決定當前的這對輸出資料最終應該交由哪個reduce task
處理。預設對key hash
後再以reduce task
數量量取模。預設的取模方式只是為了平均reduce
的處理能力,如果使用者自己對Partitioner
有需求,可以訂製並設定到job
上。 -
接下來,會將資料寫入記憶體,記憶體中這⽚片區域叫做環形緩衝區,緩衝區的作用是批量量收集
map
結果,減少磁碟IO
的影響。我們的key/value
對以及Partition
的結果都會被寫⼊入緩衝區。當然寫⼊入之前,key
與value
值都會被序列列化成位元組陣列-
環形緩衝區其實是一個數組,陣列中存放著
key
、value
的序列化資料和key
、value
的元資料資訊,包括partition
、key
的起始位置、value
的起始位置以及value
的長度。環形結構是一個抽象概念。 -
緩衝區是有大小限制,預設是
100MB
。當map task
的輸出結果很多時,就可能會撐爆記憶體,所以需要在一定條件下將緩衝區中的資料臨時寫⼊入磁碟,然後重新利利⽤用這塊緩衝區。這個從記憶體往磁碟寫資料的過程被稱為Spill
,中文可譯為溢寫。這個溢寫是由單獨執行緒來完成,不影響往緩衝區寫map結果的執行緒。溢寫執行緒啟動時不不應該阻⽌止
map的結果輸出,所以整個緩衝區有個溢寫的⽐比例例spill.percent
。這個⽐比例例預設是0.8
,也就是當緩衝區的資料已經達到閾值(buffer size * spillpercent = 100MB * 0.8 = 80MB)
,溢寫執行緒啟動,鎖定這80MB
的記憶體,執行溢寫過程Maptask
的輸出結果還可以往剩下的20MB
記憶體中寫,互不不影響、
-
-
當溢寫執行緒啟動後,需要對這
80MB
空間內的key
做排序(Sort
)。排序是MapReduce
模型預設的⾏行行為!-
如果
job
設定過Combiner
,那麼現在就是使⽤用Combiner
的時候了了。將有相同key
的key/value
對的value
加起來,減少溢寫到磁碟的資料量量。Combiner
會優化MapReduce
的中間結果,所以它在整個模型中會多次使用。 -
那哪些場景才能使⽤用
Combiner
呢?從這⾥裡里分析,Combiner
的輸出是Reducer
的輸⼊,Combiner
絕不不能改變最終的計算結果。Combiner
只應該⽤用於那種Reduce
的輸入key/value
與輸出key/value
型別完全一致,且不不影響最終結果的場景。⽐比如累加,最⼤大值等。Combiner
的使⽤用一定得慎重如果用的好,它對job
執⾏行行效率有幫助,反之會影響reduce
的最終結果
-
-
合併溢寫檔案:每次溢寫會在磁碟上生成一個臨時檔案(寫之前判斷是否有
combiner
),如果map
的輸出結果真的很大,有多次這樣的溢寫發生,磁碟上相應的就會有多個臨時檔案存在。當整個資料處理理結束之後開始對磁碟中的臨時檔案進⾏行行merge
合併,因為最終檔案只有一個,寫⼊磁碟,並且為這個檔案提供了一個索檔案,以記錄每個reduce
對應資料的偏移量量。
2.2 ReduceTask執行機制詳解
Reduce
⼤大致分為copy
、sort
、reduce
三個階段,重點在前兩個階段。copy
階段包含⼀一個
eventFetcher
來獲取已完成的map
列列表,由Fetcher執行緒去copy
資料,在此過程中會啟動兩個merge
執行緒,分別為inMemoryMerger
和onDiskMerger
,分別將記憶體中的資料merge
到磁碟和將磁碟中的資料進⾏merge
。待資料copy
完成之後,copy
階段就完成了,開始進⾏行行sort
階段,sort
階段主要是執⾏finalMerge
操作,純粹的sort
階段,完成之後就是reduce
階段,調⽤用⽤使用者定義的reduce
函式進⾏處理。
詳細步驟
2.2.1 Copy階段
簡單地拉取資料。Reduce
程序啟動一些資料copy
執行緒(Fetcher
),通過HTTP
方式請求maptask
獲取屬於自己的檔案。
2.2.2 Merge階段
Merge
階段。這⾥裡里的merge
如map
端的merge
動作,只是陣列中存放的是不不同map
端copy
來的數值。Copy
過來的資料會先放入記憶體緩衝區中,這⾥裡里的緩衝區大小要⽐比map
端的更更為靈活。merge
有三種形式:記憶體到記憶體;記憶體到磁碟;磁碟到磁碟。預設情況下第⼀一種形式不不啟⽤用。當記憶體中的資料量量到達一定閾值,就啟動記憶體到磁碟的merge
。與map
端類似,這也是溢寫的過程,這個過程中如果你設定有Combiner
,也是會啟⽤用的,然後在磁碟中生成了了眾多的溢寫檔案。第二種merge方式⼀一直在運⾏行行,直到沒有map
端的資料時才結束,然後啟動第三種磁碟到磁碟的merge
方式生成最終的檔案。
2.2.3 合併排序
把分散的資料合併成一個⼤大的資料後,還會再對合並後的資料排序。對排序後的鍵值對調⽤用reduce
方法,鍵相等的鍵值對調⽤用一次reduce
方法,每次調⽤用會產生零個或者多個鍵值對,最後把這些輸出的鍵值對寫入到HDFS
檔案中。
3.總結
從MapReduce
的執行過程中,我們再來看為什麼要排序,以及為什麼在Shuffle
時候採用SortShuffle
,從設計上來看,MapTask和ReduceTask就是完全不同的跑在Yarn上的兩個程序,程序的互動方式是通過記憶體或者磁碟,為了兩個程式不耦合,更好地實現失敗重試等機制,那麼就不能像Kafka一樣,生產者生產訊息和消費者消費訊息的時候,會有阻塞等問題,不能讓叢集卡住,MapReduce跑的資料都是大批量的資料,所以要儘可能讓Map端處理完成的資料落盤但又要保證儘可能加快整個速度,所以在map結束時候,給reduce的是排序好的資料外加一份索引檔案,這樣雖然犧牲了一定的cpu,但是對落盤後的資料,讓Reduce來拉取時候可以儘可能地快,Map如何結束執行完,理論上可以在停機後,繼續跑ReduceTask,來完成整個任務同時為什麼不是HashShuffe呢,是因為大資料情況下HashShuffle佔用的記憶體很大,很可能會爆記憶體,導致叢集計算不穩定。
吳邪,小三爺,混跡於後臺,大資料,人工智慧領域的小菜鳥。
更多請關注