1. 程式人生 > >hadoop中的一些重要知識點

hadoop中的一些重要知識點

hadoop之job和shuffle過程

1、job提交流程

1、使用者向YARN中提交應用程式,其中包括ApplicationMaster(AM)程式,啟動AM的命令,使用者程式等。

2、ResourceManger(RM)為該程式分配第一個Container,並與對應的NodeManger通訊,要求它在這個Container中啟動應用程式AM。

3、AM首先向RM註冊,這樣使用者可以直接通過RM檢視應用程式的執行狀態,然後將為各個任務申請資源,並監控它的執行狀態,直到執行結束,重複4--7的步驟。

4、AM採用輪詢的方式通過RPC協議向RM申請和領取資源。

5、一旦AM申請到資源後,便與對應的NM通訊,要求它啟動任務。

6、NM為任務設定好執行環境(包括環境變數、JAR包、二進位制程式等)後,將任務啟動命令寫到一個指令碼中,並通過執行該指令碼啟動任務。

7、各個任務通過某個RPC協議向AM彙報自己的狀態和進度,以讓AM隨時掌握各個任務的執行狀態,從而可以在任務失敗的時候重新啟動任務。

8、應用程式執行完成後,AM向RM登出並關閉自己。

(1)作業提交

第 0 步:client 呼叫 job.waitForCompletion 方法,向整個叢集提交 MapReduce 作業。

第 1 步:client 向 RM 申請一個作業 id。

第 2 步:RM 給 client 返回該 job 資源的提交路徑和作業 id。

第 3 步:client 提交 jar 包、切片資訊和配置檔案到指定的資源提交路徑。

第 4 步:client 提交完資源後,向 RM 申請執行 MrAppMaster。

(2)作業初始化

第 5 步:當 RM 收到 client 的請求後,將該 job 新增到容量排程器中。

第 6 步:某一個空閒的 NM 領取到該 job。

第 7 步:該 NM 建立 Container,併產生 MRAppmaster。

第 8 步:下載 client 提交的資源到本地。

(3)任務分配

第 9 步:MrAppMaster 向 RM 申請執行多個 maptask 任務資源。

第 10 步 :RM 將執行 maptask 任務分配給另外兩個 NodeManager,另兩個 NodeManager

分別領取任務並建立容器。

(4)任務執行

第 11 步:MR 向兩個接收到任務的 NodeManager 傳送程式啟動指令碼,這兩個NodeManager 分別啟動 maptask,maptask 對資料分割槽排序。

第 12 步:MrAppMaster 等待所有 maptask 執行完畢後,向 RM 申請容器,執行 reduce task。

第 13 步:reduce task 向 maptask 獲取相應分割槽的資料。

第 14 步:程式執行完畢後,MR 會向 RM 申請登出自己。

(5)進度和狀態更新

YARN 中的任務將其進度和狀態(包括 counter)返回給應用管理器, 客戶端每秒(通過

mapreduce.client.progressmonitor.pollinterval 設定)嚮應用管理器請求進度更新, 展示給使用者。

(6)作業完成

除了嚮應用管理器請求作業進度外, 客戶端每 5 分鐘都會通過呼叫 waitForCompletion()

來檢查作業是否完成。時間間隔可以通過 mapreduce.client.completion.pollinterval 來設定。作

業完成之後, 應用管理器和 container 會清理工作狀態。作業的資訊會被作業歷史伺服器儲存

以備之後使用者核查。

####resourceManager包括兩個元件:

####1、排程器:負責排程任務執行的順序,僅是排程的作用,不參與任何任務的執行

(1)FIFO先進先出排程器:先來的任務先執行,只有一個佇列

(2)Capacity容量(計算能力)排程器:有多個佇列,為每個佇列分配不同的資源,每個佇列遵循FIFO(hadoop 2中預設的排程模型)

(3)Fair公平排程器:所有任務平分共享資源

####2、工作管理員applicationsManager(ASM):負責任務啟動或停止或失敗啟動

7)等到分配到相關資源之後就排程器會先給應用程式在一個nodemanager中分配一個容器(container),然後在容器中先啟動程式的管理者,用於管理任務的執行進度和完成情況,這個管理者叫APP MASTER(主類是MRAppMaster)。

8)MRappmaster會對作業進行初始化,初始化的過程中會建立多個簿記物件(作業簿),用來跟蹤作業的完成。

9)接下來MRAppMaster會啟動相應的maptask,但是不知道啟動幾個啊,也不知道每一個對應的資料切片資訊,怎麼辦?這時候就會去先訪問共享檔案系統中,獲取本作業的輸入切片資訊。會獲取到本job中有幾個maptask和reducetask。此時mrappmaster會首先進行一個決策,就是如果maptask的數量小於10個,而reducetask的數量只有1個,並且輸入大小小於一個塊的作業,就會選用和MRAppmaster同一個JVM中執行任務,因為此時在別的節點上重新啟動容器和銷燬容器的開銷過大不划算,這種模式就是uber模式。然後MRAppMaster會對每一個分片建立一個maptask物件。

10)如果作業不適合作為uber模式執行,此時MRAppMater就會為maptask和reducetask任務向RM申請資源,先發送maptask的請求,再發送reducetask的請求。注意maptask有資料本地化的侷限。優先資料本地化,其次機架本地化最後任意節點。請求返回相應的節點資訊。

11)RM向MRappMaster返回資源資訊,空閒的資源節點,MRAppmaster就會去相應的nodemanager節點上啟動Container

12)之後會啟動一個YarnChild程序用於執行maptask程式

13)Maptask執行之前會首先去共享檔案系統中下拷貝相應的檔案,包括jar包、job.xml檔案以及job.split分片資訊等,下載到本地

14)啟動相應的maptask任務

15)maptask進行到80%的時候,MRAppmaster會去啟動reduceTask,啟動過程同上述過程

16)資源回收和銷燬

2、MapReduce的shuffle過程

####一、Map端的shuffle

Map端會處理輸入資料併產生中間結果,這個中間結果會寫到本地磁碟,而不是HDFS。每個Map的輸出會先寫到記憶體緩衝區中,當寫入的資料達到設定的閾值時,系統將會啟動一個執行緒將緩衝區的資料寫到磁碟,這個過程叫做spill。

在spill寫入之前,會先進行二次排序,首先根據資料所屬的partition進行排序,然後每個partition中的資料再按key來排序。partition的目是將記錄劃分到不同的Reducer上去,以期望能夠達到負載均衡,以後的Reducer就會根據partition來讀取自己對應的資料。接著執行combiner(如果設定了的話),combiner的本質也是一個Reducer,其目的是對將要寫入到磁碟上的檔案先進行一次處理,這樣,寫入到磁碟的資料量就會減少。最後將資料寫到本地磁碟產生spill檔案(spill檔案儲存在{mapred.local.dir}指定的目錄中,Map任務結束後就會被刪除)。

最後,每個Map任務可能產生多個spill檔案,在每個Map任務完成前,會通過多路歸併演算法將這些spill檔案歸併成一個檔案。至此,Map的shuffle過程就結束了。

####二、Reduce端的shuffle

Reduce端的shuffle主要包括三個階段,copy、sort(merge)和reduce。

首先要將Map端產生的輸出檔案拷貝到Reduce端,但每個Reducer如何知道自己應該處理哪些資料呢?因為Map端進行partition的時候,實際上就相當於指定了每個Reducer要處理的資料(partition就對應了Reducer),所以Reducer在拷貝資料的時候只需拷貝與自己對應的partition中的資料即可。每個Reducer會處理一個或者多個partition,但需要先將自己對應的partition中的資料從每個Map的輸出結果中拷貝過來。

接下來就是sort階段,也成為merge階段,因為這個階段的主要工作是執行了歸併排序。從Map端拷貝到Reduce端的資料都是有序的,所以很適合歸併排序。最終在Reduce端生成一個較大的檔案作為Reduce的輸入。

最後就是Reduce過程了,在這個過程中產生了最終的輸出結果,並將其寫到HDFS上。

個人總結:

shuffle是map產生輸出到reduce消化輸入的過程
一個待處理的文字,客戶端在submit()之前呼叫maptesk進行切片處理,maptask用textinputformat進行reader()讀取,
然後maptask呼叫使用者自定義的mapper進行邏輯處理,輸出<k,v>對,將其寫入到環形緩衝區中,環形緩衝區預設大小為100M 當達到80%時進行溢寫操作,然後再spile階段進行partitiator分割槽,且分割槽內排序,再進行combiner合併操作,
兩兩合併,並進行merge操作,merge操作再次進行歸併排序,reducetesk將多個merge檔案再次進行歸併排序