1. 程式人生 > >MapTask和ReduceTask執行機制以及Map任務的並行度

MapTask和ReduceTask執行機制以及Map任務的並行度

1、MapTask執行機制詳解以及Map任務的並行度

整個Map階段流程大體如下圖所示。 在這裡插入圖片描述

簡單概述:inputFile通過split被邏輯切分為多個split檔案,通過Record按行讀取內容給map(使用者自己實現的)進行處理,資料被map處理結束之後交給OutputCollector收集器,對其結果key進行分割槽(預設使用hash分割槽),然後寫入buffer,每個map task都有一個記憶體緩衝區,儲存著map的輸出結果,當緩衝區快滿的時候需要將緩衝區的資料以一個臨時檔案的方式存放到磁碟,當整個map task結束後再對磁碟中這個map task產生的所有臨時檔案做合併,生成最終的正式輸出檔案,然後等待reduce task來拉資料。

詳細步驟:

1、 首先,讀取資料元件InputFormat(預設TextInputFormat)會通過getSplits方法對輸入目錄中檔案進行邏輯切片規劃得到splits,有多少個split就對應啟動多少個MapTask。split與block的對應關係預設是一對一。

2、 將輸入檔案切分為splits之後,由RecordReader物件(預設LineRecordReader)進行讀取,以\n作為分隔符,讀取一行資料,返回<key,value>。Key表示每行首字元偏移值,value表示這一行文字內容。

3、 讀取split返回<key,value>,進入使用者自己繼承的Mapper類中,執行使用者重寫的map函式。RecordReader讀取一行這裡呼叫一次。

4、 map邏輯完之後,將map的每條結果通過context.write進行collect資料收集。在collect中,會先對其進行分割槽處理,預設使用HashPartitioner。 MapReduce提供Partitioner介面,它的作用就是根據key或value及reduce的數量來決定當前的這對輸出資料最終應該交由哪個reduce task處理。預設對key hash後再以reduce task數量取模。預設的取模方式只是為了平均reduce的處理能力,如果使用者自己對Partitioner有需求,可以訂製並設定到job上。

5、接下來,會將資料寫入記憶體,記憶體中這片區域叫做環形緩衝區,緩衝區的作用是批量收集map結果,減少磁碟IO的影響。我們的key/value對以及Partition的結果都會被寫入緩衝區。當然寫入之前,key與value值都會被序列化成位元組陣列。 環形緩衝區其實是一個數組,陣列中存放著key、value的序列化資料和key、value的元資料資訊,包括partition、key的起始位置、value的起始位置以及value的長度。環形結構是一個抽象概念。 緩衝區是有大小限制,預設是100MB。當map task的輸出結果很多時,就可能會撐爆記憶體,所以需要在一定條件下將緩衝區中的資料臨時寫入磁碟,然後重新利用這塊緩衝區。這個從記憶體往磁碟寫資料的過程被稱為Spill,中文可譯為溢寫。這個溢寫是由單獨執行緒來完成,不影響往緩衝區寫map結果的執行緒。溢寫執行緒啟動時不應該阻止map的結果輸出,所以整個緩衝區有個溢寫的比例spill.percent。這個比例預設是0.8,也就是當緩衝區的資料已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫執行緒啟動,鎖定這80MB的記憶體,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB記憶體中寫,互不影響。

6、當溢寫執行緒啟動後,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型預設的行為,這裡的排序也是對序列化的位元組做的排序。

如果job設定過Combiner,那麼現在就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減少溢寫到磁碟的資料量。Combiner會優化MapReduce的中間結果,所以它在整個模型中會多次使用。 那哪些場景才能使用Combiner呢?從這裡分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。Combiner只應該用於那種Reduce的輸入key/value與輸出key/value型別完全一致,且不影響最終結果的場景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對job執行效率有幫助,反之會影響reduce的最終結果。

7、合併溢寫檔案:每次溢寫會在磁碟上生成一個臨時檔案(寫之前判斷是否有combiner),如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁碟上相應的就會有多個臨時檔案存在。當整個資料處理結束之後開始對磁碟中的臨時檔案進行merge合併,因為最終的檔案只有一個,寫入磁碟,並且為這個檔案提供了一個索引檔案,以記錄每個reduce對應資料的偏移量。 至此map整個階段結束。

mapTask的一些基礎設定配置(mapred-site.xml當中社會):

設定一:設定環型緩衝區的記憶體值大小(預設設定如下) mapreduce.task.io.sort.mb 100

設定二:設定溢寫百分比(預設設定如下) mapreduce.map.sort.spill.percent 0.80

設定三:設定溢寫資料目錄(預設設定) mapreduce.cluster.local.dir ${hadoop.tmp.dir}/mapred/local

設定四:設定一次最多合併多少個溢寫檔案(預設設定如下) mapreduce.task.io.sort.factor 10

2、ReduceTask 工作機制以及reduceTask的並行度

在這裡插入圖片描述

Reduce大致分為copy、sort、reduce三個階段,重點在前兩個階段。copy階段包含一個eventFetcher來獲取已完成的map列表,由Fetcher執行緒去copy資料,在此過程中會啟動兩個merge執行緒,分別為inMemoryMerger和onDiskMerger,分別將記憶體中的資料merge到磁碟和將磁碟中的資料進行merge。待資料copy完成之後,copy階段就完成了,開始進行sort階段,sort階段主要是執行finalMerge操作,純粹的sort階段,完成之後就是reduce階段,呼叫使用者定義的reduce函式進行處理。

詳細步驟:

1、Copy階段,簡單地拉取資料。Reduce程序啟動一些資料copy執行緒(Fetcher),通過HTTP方式請求maptask獲取屬於自己的檔案。

2、Merge階段。這裡的merge如map端的merge動作,只是陣列中存放的是不同map端copy來的數值。Copy過來的資料會先放入記憶體緩衝區中,這裡的緩衝區大小要比map端的更為靈活。merge有三種形式:記憶體到記憶體;記憶體到磁碟;磁碟到磁碟。預設情況下第一種形式不啟用。當記憶體中的資料量到達一定閾值,就啟動記憶體到磁碟的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設定有Combiner,也是會啟用的,然後在磁碟中生成了眾多的溢寫檔案。第二種merge方式一直在執行,直到沒有map端的資料時才結束,然後啟動第三種磁碟到磁碟的merge方式生成最終的檔案。

3、合併排序。把分散的資料合併成一個大的資料後,還會再對合並後的資料排序。

4、對排序後的鍵值對呼叫reduce方法,鍵相等的鍵值對呼叫一次reduce方法,每次呼叫會產生零個或者多個鍵值對,最後把這些輸出的鍵值對寫入到HDFS檔案中。