1. 程式人生 > >MapReduce作業提交流程

MapReduce作業提交流程

Job Submission

1.客戶端呼叫job.submit方法提交作業,該方法內部建立一個JobSubmitter物件例項,該例項物件呼叫submitJobInternal方法提交作業。當作業成功提交後,客戶端呼叫的waitForCompletion方法將一直詢問作業的進度資訊並列印。

作業提交的內部處理過程:

  1. 首先通過RPC呼叫向 resource manager申請一個Application ID,也就是MapReduce作業的ID。

  2. 檢查作業的輸出目錄是否存在,如果存在,作業將不會提交,任務執行失敗。

  3. 計算作業輸入的splits,如果計算失敗,作業將不會提交,任務執行失敗。

  4. 將作業的splits資訊,執行jar包,配置檔案等拷貝到分散式共享檔案系統上,如HDFS上(有備份)。

  5. 提交作業,呼叫submitApplication方法將作業提交到resource manager。

Job Initialization

一旦resource manager 接收到submitApplication呼叫訊息,就將該作業提交請求轉交給任務排程器,任務排程器首先在Node manager上分配一個container,並且resource manager在該Node manager上啟動作業的application master程序。application master也是一個java應用,它的主函式為 MRAppMaster,在它初始化的過程中,首先會建立一些記錄物件來記錄作業執行過程中的資訊,然後它通過共享檔案系統獲取splits資訊,對每個split啟動一個map task,每個map task會分配一個task id。

uber task

在application master啟動task之前,它首先會進行判斷,是否需要使用分散式的方式來啟動task,當申請container來並行執行各個task的時間大於序列執行時間的時候,將直接在當前jvm下序列執行該任務。一般該條件是:map task少於十個,reduce只有1個,輸入檔案不超過一個block。

在啟動任務之前,application master還會呼叫 OutputCommitter的 setupJob方法來建立輸出目錄,以便task把臨時資料輸出放到該目錄。

Task Assignment

如果任務是分散式執行的話,application master將向resource manager申請執行各個task的container,一般來說,申請map task的優先順序要高於reduce task。但有一點要注意,reduce task可以執行在任何節點,而map task因為有資料本地性策略的要求,所以它們往往限制在資料塊所在的節點上執行。每個container預設分配1024M記憶體,1個虛擬核。

Task Execution

一旦task所需的資源被分配後,application master將和node manager通訊,啟動task,在task啟動之前,它們會從共享檔案系統上下載相應的jar,配置檔案和分散式快取檔案等。每個task的執行緒名稱為 YarnChild,它執行在jvm中,它的失敗不會導致node manager的宕機。

Job Completion

當作業最後一個task完成後,application master收到通知,將任務標記為successful,當client詢問job資訊時,知道該作業已完成,waitForCompletaion將不再阻塞,列印結果資訊。最後,清理 application master和task的container資源,將任務資訊打包,儲存為歷史記錄以便後期查詢。