1. 程式人生 > >Hadoop MapReduce八大步驟以及Yarn工作原理詳解

Hadoop MapReduce八大步驟以及Yarn工作原理詳解

Hadoop是市面上使用最多的大資料分散式檔案儲存系統和分散式處理系統, 其中分為兩大塊分別是hdfs和MapReduce, hdfs是分散式檔案儲存系統, 借鑑了Google的GFS論文. MapReduce是分散式計算處理系統, 借鑑了Google的MapReduce論文. 本文著重來梳理下新版也就是2.3後的Hadoop的MapReduce部分, 也就是Yarn框架, 以及MapReduce的八大步驟的詳細工作.

新老MapReduce的介紹和對比

老版的MapReduce介紹

老版的MapReduce分為兩個部分: JobTracker和TaskTracker.

  • JobTracker負責接收客戶端提交的任務請求, 分配系統資源, 分配任務給TaskTracker, 管理任務的失敗/重啟等操作.
  • TaskTracker負責接收並執行JobTracker分配的任務, 並與JobTracker保持心跳機制, 向JobTracker報告自己任務的執行狀況.

老版的MapReduce存在問題

  • 所有的任務都在JobTracker上面進行分配, 排程和監控, 處理. 造成了過多的資源消耗, 當job比較多的時候, 增大了JobTracker機器fail的風險.
  • JobTracker 是 Map-reduce 的集中處理點,存在單點故障。
  • 在 TaskTracker 端,以 map/reduce task 的數目作為資源的表示過於簡單,沒有考慮到 cpu/ 記憶體的佔用情況,如果兩個大記憶體消耗的 task 被排程到了一塊,很容易出現 OOM。
  • 在 TaskTracker 端,把資源強制劃分為 map task slot 和 reduce task slot, 如果當系統中只有 map task 或者只有 reduce task 的時候,會造成資源的浪費,也就是前面提過的叢集資源利用的問題。

新版的MapReduce介紹

新版的MapReduce也叫作Yarn框架, 其最重要的重構在於將資源分配與任務排程/監控進行了分離.

  • ResourceManager: 資源排程器

    首先保持和NodeManager的心跳機制, 接受客戶端的任務請求, 根據NodeManager報告的資源情況, 啟動排程任務, 分配Container給ApplicationMaster, 監控AppMaster的存在情況, 負責作業和資源分配排程, 資源包括CPU, 記憶體, 磁碟, 網路等, 它不參與具體任務的分配和監控, 也不能管理具體任務的失敗和重啟等.

  • ApplicationMaster: 工作管理員

    在其中一臺Node機器上, 負責一個Job的整個生命週期. 包括任務的分配排程, 任務的失敗和重啟管理, 具體任務的所有工作全部由ApplicationMaster全權管理, 就像一個大管家一樣, 只向ResourceManager申請資源, 向NodeManager分配任務, 監控任務的執行, 管理任務的失敗和重啟.

  • NodeManager: 任務處理器

    負責處理ApplicationMaster分配的任務, 並保持與ResourceManager的心跳機制, 監控資源的使用情況並向RM報告進行報告, 支援RM的資源分配工作.

MapReduce工作的八大步驟詳解

MapReduce的思想

MapReduce最重要的一個思想: 分而治之. 就是將負責的大任務分解成若干個小任務, 並行執行. 完成後在合併到一起. 適用於大量複雜的任務處理場景, 大規模資料處理場景.

Map負責“分”,即把複雜的任務分解為若干個“簡單的任務”來並行處理。可以進行拆分的前提是這些小任務可以平行計算,彼此間幾乎沒有依賴關係。

Reduce負責“合”,即對map階段的結果進行全域性彙總。 在這裡插入圖片描述

從上圖可以看出來, 在客戶端提交計算任務後, 首先要讀取這個檔案, 預設情況下, 一個block塊就會有一個Maptask進行處理. 處理完成後通過Shuffle階段, 經過一系列分割槽, 排序, 規約, 分組, 進入reduce端, reduce端經過處理後, output輸出處理結果即可. 這就是MapReduce的一個大致工作流程.

MapReduce的八大步驟:

Map階段:

  • 第一步: 通過FileInputFormat讀取檔案, 解析檔案成為key, value對, 輸出到第二步.
  • 第二步: 自定義Map邏輯, 處理key1, value1, 將其轉換為key2, value2, 輸出到第三步.

Shuffle階段:

  • 第三步: 對key2, value2進行分割槽.
  • 第四步: 對不同分割槽內的資料按照相同的key進行排序.
  • 第五步: 分組後的資料進行規約(combine操作),降低資料的網路拷貝(可選步驟)
  • 第六步: 對排序後的資料, 將相同的key的value資料放入一個集合中, 作為value2.

Reduce階段:

  • 第七步: 對多個map的任務進行合併, 排序. 自定義reduce邏輯, 處理key2, value2, 將其轉換為key3, value3, 進行輸出.
  • 第八步: 通過FileOutputFormat輸出處理後的資料, 儲存到檔案. 在這裡插入圖片描述

MapTask執行機制詳解

整個MapTask的簡要概述:

首先一個檔案被split邏輯切分成了多個split檔案(切片), 通過FileInputFormat的RecordReader按行(也可以自定義)讀取內容給map進行處理, 資料被map處理結束後交給OutputCollector收集器, 對其結果key進行分割槽 (預設使用Hash分割槽), 然後寫入記憶體緩衝區(buffer), 每個MapTask都有一個記憶體緩衝區, 收集map處理結果, 緩衝區很小需要重複利用, 每次緩衝區快滿的時候就會將臨時檔案寫入到磁碟中, 在記憶體緩衝區中會進行排序, 規約, 當整個MapTask任務結束後, 合併這些磁碟中的臨時檔案, 生成最終的輸出檔案, 等待reduceTask拉取.

詳細步驟:

  • 首先,讀取資料元件InputFormat(預設TextInputFormat)會通過getSplits方法對輸入目錄中檔案進行邏輯切片規劃得到splits,有多少個split就對應啟動多少個MapTask。split與block的對應關係預設是一對一。

  • 將輸入檔案切分為splits之後,由RecordReader物件(預設LineRecordReader)進行讀取,以\n作為分隔符,讀取一行資料,返回<key,value>。Key表示每行首字元偏移值,value表示這一行文字內容。

  • 讀取split返回<key,value>,進入使用者自己繼承的Mapper類中,執行使用者重寫的map函式。RecordReader讀取一行這裡呼叫一次。

  • map邏輯完之後,將map的每條結果通過context.write進行collect資料收集。在collect中,會先對其進行分割槽處理,預設使用HashPartitioner。

    分割槽的作用就是根據key或value及reduce的數量來決定當前的這對輸出資料最終應該交由哪個reduce task處理。預設對key hash後再以reduce task數量取模。預設的取模方式只是為了平均reduce的處理能力,如果使用者自己對Partitioner有需求,可以訂製並設定到job上。

  • 接下來,會將資料寫入記憶體,記憶體中這片區域叫做環形緩衝區,緩衝區的作用是批量收集map結果,減少磁碟IO的影響。我們的key/value對以及Partition的結果都會被寫入緩衝區。當然寫入之前,key與value值都會被序列化成位元組陣列。

    環形緩衝區其實是一個數組,陣列中存放著key、value的序列化資料和key、value的元資料資訊,包括partition、key的起始位置、value的起始位置以及value的長度。環形結構是一個抽象概念。

    緩衝區是有大小限制,預設是100MB。當map task的輸出結果很多時,就可能會撐爆記憶體,所以需要在一定條件下將緩衝區中的資料臨時寫入磁碟,然後重新利用這塊緩衝區。

    這個從記憶體往磁碟寫資料的過程被稱為Spill,譯為溢寫。這個溢寫是由單獨執行緒來完成,不影響往緩衝區寫map結果的執行緒。所以整個緩衝區有個溢寫的比例, 這個比例預設是0.8,也就是當緩衝區的資料已經達到閾值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢寫執行緒啟動,鎖定這80MB的記憶體,執行溢寫過程。Map task的輸出結果還可以往剩下的20MB記憶體中寫,互不影響。

  • 當溢寫執行緒啟動後,需要對這80MB空間內的key做排序(Sort)。排序是MapReduce模型預設的行為,這裡的排序也是對序列化的位元組做的排序。如果job設定過Combiner,那麼現在就是使用Combiner的時候了。將有相同key的key/value對的value加起來,減少溢寫到磁碟的資料量。Combiner會優化MapReduce的中間結果,所以它在整個模型中會多次使用。

    那哪些場景才能使用Combiner呢?從這裡分析,Combiner的輸出是Reducer的輸入,Combiner絕不能改變最終的計算結果。Combiner只應該用於那種Reduce的輸入key/value與輸出key/value型別完全一致,且不影響最終結果的場景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它對job執行效率有幫助,反之會影響reduce的最終結果。

  • 合併溢寫檔案:Ø每次溢寫會在磁碟上生成一個臨時檔案(寫之前判斷是否有combiner),如果map的輸出結果真的很大,有多次這樣的溢寫發生,磁碟上相應的就會有多個臨時檔案存在。當整個資料處理結束之後開始對磁碟中的臨時檔案進行merge合併,因為最終的檔案只有一個,寫入磁碟,並且為這個檔案提供了一個索引檔案,以記錄每個reduce對應資料的偏移量。

ReduceTask執行機制詳解

簡要概述: Reduce大致分為copy、sort、reduce三個階段,重點在前兩個階段。reduceTask會啟動Fetcher執行緒去Copy屬於自己的的檔案, 首先將檔案放入記憶體緩衝區, 當copy來的檔案到達一定閾值, 就會合並檔案到磁碟, 然後在磁碟中生成了眾多的溢寫檔案。直到沒有map端的資料時才結束, 然後合併磁碟中的檔案, 生成最終的檔案.

詳細步驟:

  • Copy階段,簡單地拉取資料。Reduce程序啟動一些資料copy執行緒(Fetcher),通過HTTP方式請求maptask獲取屬於自己的檔案。
  • Merge階段。這裡的merge如map端的merge動作,只是陣列中存放的是不同map端copy來的數值。Copy過來的資料會先放入記憶體緩衝區中,這裡的緩衝區大小要比map端的更為靈活。merge有三種形式:記憶體到記憶體;記憶體到磁碟;磁碟到磁碟。預設情況下第一種形式不啟用。當記憶體中的資料量到達一定閾值,就啟動記憶體到磁碟的merge。與map 端類似,這也是溢寫的過程,這個過程中如果你設定有Combiner,也是會啟用的,然後在磁碟中生成了眾多的溢寫檔案。第二種merge方式一直在執行,直到沒有map端的資料時才結束,然後啟動第三種磁碟到磁碟的merge方式生成最終的檔案。
  • 合併排序。把分散的資料合併成一個大的資料後,還會再對合並後的資料排序。
  • 對排序後的鍵值對呼叫reduce方法,鍵相等的鍵值對呼叫一次reduce方法,每次呼叫會產生零個或者多個鍵值對,最後把這些輸出的鍵值對寫入到HDFS檔案中。

MapReduce Shuffle過程

map階段處理的資料如何傳遞給reduce階段,是MapReduce框架中最關鍵的一個流程,這個流程就叫shuffle。

shuffle: 洗牌、發牌——(核心機制:資料分割槽,排序,分組,規約,合併等過程)。

shuffle是Mapreduce的核心,它分佈在Mapreduce的map階段和reduce階段。一般把從Map產生輸出開始到Reduce取得資料作為輸入之前的過程稱作shuffle。

  • Collect階段:將MapTask的結果輸出到預設大小為100M的環形緩衝區,儲存的是key/value,Partition分割槽資訊等。
  • Spill階段:當記憶體中的資料量達到一定的閥值的時候,就會將資料寫入本地磁碟,在將資料寫入磁碟之前需要對資料進行一次排序的操作,如果配置了combiner,還會將有相同分割槽號和key的資料進行排序。
  • Merge階段:把所有溢位的臨時檔案進行一次合併操作,以確保一個MapTask最終只產生一箇中間資料檔案。
  • Copy階段:ReduceTask啟動Fetcher執行緒到已經完成MapTask的節點上覆制一份屬於自己的資料,這些資料預設會儲存在記憶體的緩衝區中,當記憶體的緩衝區達到一定的閥值的時候,就會將資料寫到磁碟之上。
  • Merge階段:在ReduceTask遠端複製資料的同時,會在後臺開啟兩個執行緒對記憶體到本地的資料檔案進行合併操作。
  • Sort階段:在對資料進行合併的同時,會進行排序操作,由於MapTask階段已經對資料進行了區域性的排序,ReduceTask只需保證Copy的資料的最終整體有效性即可。

Shuffle中的緩衝區大小會影響到mapreduce程式的執行效率,原則上說,緩衝區越大,磁碟io的次數越少,執行速度就越快。緩衝區的大小可以通過引數調整, 引數:mapreduce.task.io.sort.mb 預設100M. 在這裡插入圖片描述

今天真是個傷心的日子, 因為趙麗穎和馮紹峰發微博宣告結婚了, 我的又一個女神嫁出去了. 好像是男人的通病, 看著女神出嫁總有點心裡悵然若失…不過, 還是祝福吧… 對了, 為啥說又一個女神呢? 因為我的上一個女神是安以軒…