1. 程式人生 > >Hadoop的MapReduce模型基本原理

Hadoop的MapReduce模型基本原理

font cti 初始化 BE input 並且 HA HR 內存

參考:

https://www.zybuluo.com/frank-shaw/note/206604

一、MapReduce數據處理流程

技術分享圖片

關於上圖,可以做出以下逐步分析:

  1. 輸入數據(待處理)首先會被切割分片,每一個分片都會復制多份到HDFS中。上圖默認的是分片已經存在於HDFS中。
  2. Hadoop會在存儲有輸入數據分片(HDFS中的數據)的節點上運行map任務,可以獲得最佳性能(數據TaskTracker優化,節省帶寬)。
  3. 在運行完map任務之後,可以看到數據並不是存回HDFS中,而是直接存在了本地磁盤上,因為map輸出數據是中間數據,該中間數據由reduce任務處理之後才會產生最終輸出結果,reduce任務完成之後這些數據是要被刪除掉的。
  4. map的輸出結果會在本地進行分區,並進行排序,這是為之後的reduce階段做準備。分區方法常用的是對key值進行Hash轉換之後求模,這樣就可以將相同key值的數據放在同一個分區,reduce階段同一個分區的數據會被安排到同一個reduce中。
  5. 如果有必要,可以再map階段設置combine方法,combine方法與reduce方法的函數體是同一個(做的事是一樣的),只不過combine方法針對的對象只是當前map中key值相同的數據,而reduce方法處理的是所有輸入數據中相同的key對應的數據。也就是說,它只是reduce的一個小分身。這麽做的目的是為了減輕從map階段傳送到reduce階段的IO傳送負擔,也是節省帶寬的一種方式(做了combine優化之後,傳送的數據量會大大減少)。
  6. 每一個reduce都會將所有map對應分區的數據通過IO復制過來,進行合並。合並的過程包含排序的過程,因為要將相同的key值對應的數據統一處理。在reduce計算階段,reduce的輸入鍵是key,而輸入值是相同的key數據對應的value所構成的一個叠代器數據結構。
  7. 經過reduce處理之後,最後的輸出結果就是我們想要的結果。該輸出會存儲在HDFS中,第一塊副本存儲在本地節點上,其他副本存儲在其他機架節點中。進一步,可以將這些輸出結果作為另一個MapRuce任務的輸入,進行更多的任務計算。

二、MapReduce在Hadoop上的具體實現

技術分享圖片

這個實現機制就是MapReduce1,在Hadoop2.x的時候實現機制變成了YARN。了解MapReduce1對於我們理解Hadoop非常有幫助,晚些時候會寫一篇專門關於YARN的文章。

如果細看,可以發現,MapReduce1實現圖其實與一開始的MapReduce的工作流程總體是一致的,只不過多了JobTracker、TaskTracker以及Client這幾個角色。map和reduce任務分配給了多個TaskTracker來執行。這幾個角色非常重要,有必要詳細了解。

關於client

客戶端(client):這個是程序員主要工作的部分,工作分別是編寫mapreduce程序,配置相應的文件信息,提交作業。如果出現錯誤了,需要找出錯誤,修改程序,直到完美運行。

JobTracker與TaskTracker介紹

JobTracker與TaskTracker之間服從的是主從結構。從圖中可以看到:主節點JobTracker只有一個,而從節點TaskTracker有很多個。

JobTracker負責:

  • 接收客戶提交的計算任務
  • 把計算任務分配給TaskTracker執行
  • 監控TaskTracker的執行情況

TaskTrackers負責:

  • 完成JobTracker分配的計算任務

JobTracker與TaskTrackers之間的關系就是項目經理與開發人員的關系。項目經理接到用戶的需求清單,那麽將用戶的需求分配給開發人員來完成。

三、具體實現機制

了解了如何從MapReduce遷移到JobTracker TaskTrackers之後,我們來詳細講講其中的實現機制(下面講到的每一點都對應圖上的相應數字):

技術分享圖片

作業的提交

1.寫好的一個MapReduce程序就是一個job。點擊運行。此時會生成一個JobClient,它會做一系列的準備工作,當準備工作做好了之後,才會向JobTracker提交任務。

2.JobClient向JobTracker請求一個新的job ID。與此同時,JobClient會先做如下檢查:

  • 檢查作業的輸出目錄,如果未指定或已存在則不提交作業並拋錯誤給程序;
  • 檢查輸入目錄是否存在,如果不存在同樣拋出錯誤;如果存在,JobTracker會根據輸入計算輸入分片(Input Split)並生成分片,如果分片計算不出來也會拋出錯誤。

3.JobClient將運行作業所需要的資源(包括作業jar文件,配置文件和計算所得的輸入分片)復制到JobTracker的文件系統中以job ID命名的目錄下(即HDFS中)。值得註意的是,作業jar副本較多(默認mapred.submit.replication = 10)。

4.上面的準備工作做好了之後,它會給JobTracker提交任務(它會告知JobTracker:大哥,我們這邊準備好了,隨時可以戰鬥。哎呀呀,逗比了)。

作業的初始化

5.JobTracker接收到作業提交信息後,將其放入內部隊列,交由job scheduler進行調度,並對其進行初始化(初始化就是創建一個正在運行的job對象(封裝任務和記錄信息),以便JobTracker跟蹤job的狀態和進程)。

6.初始化完畢後,作業調度器會獲取輸入分片信息(input split),每個分片創建一個map任務。關於分片的數量問題(即map數量),前面已經有提及。關於reduce數量,則是由用戶在配置文件裏指定的。

除了map和reduce任務,還有setupJob和cleanupJob需要建立:由每一個TaskTrackers在所有map開始前和所有reduce結束後分別執行。setupJob()創建輸出目錄和任務的臨時工作目錄,cleanupJob()刪除臨時工作目錄。

作業的分配

7.每個TaskTracker定期發送心跳給JobTracker,告知自己還活著,並附帶消息說明自己是否已準備好接受新任務。JobTracker以此來分配任務,並使用心跳的返回值與TaskTracker通信。JobTracker利用調度算法先選擇一個job然後再選此job的一個task分配給TaskTracker.

每個TaskTracker會有固定數量的map和reduce任務槽,數量有TaskTracker核的數量和內存大小來決定。JobTracker會先將TaskTracker的所有的map槽填滿,然後才填此TaskTracker的reduce任務槽。
JobTracker分配map任務時會選取與輸入分片最近的TaskTracker,即數據TaskTracker優化。在分配reduce任務用不著考慮數據TaskTracker。

任務的執行

8.TaskTracker分配到一個任務後,首先從HDFS中把作業的jar文件及運行所需要的全部文件(DistributedCache設置的)復制到TaskTracker**本地**。接下來TaskTracker為任務新建一個本地工作目錄,並把jar文件的內容解壓到這個文件夾下(此時需要用到的就是前面提及的setupJob(),其作用是創建輸出目錄和任務的臨時工作目錄)。

9.TaskTracker新建一個taskRunner實例來運行該任務。

10.TaskRunner啟動一個新的JVM來運行每個任務。

此時圖中顯示的所有動作都已經寫下來了。只不過還有一些細節需要把握。請看下面:

進度和狀態的更新

Child JVM有獨立的線程每隔3秒檢查任務更新標誌,如果有更新就會報告給此TaskTracker;
TaskTracker每隔5秒給JobTracker發心跳;(當然這個時間可以設置)
job tracker合並這些更新,產生一個表明所有運行作業及其任務狀態的全局視圖。
JobClient.monitorAndPrintJob()每秒查詢這些信息。

作業的完成

當JobTracker收到最後一個任務(this will be the special job cleanup task)的完成報告後,便把job狀態設置為successful。Job得到完成信息便從waitForCompletion()返回。
最後,JobTracker清空作業的工作狀態,並指示TaskTracker也清空作業的工作狀態(如刪除中間輸出)。

失敗處理機制

分布式計算過程中節點失敗是很常見的。作為一個成熟的實現機制,應該有一套完善的失敗處理機制。

在Hadoop的MapReduce1架構中常見失敗有三種:任務失敗、TaskTracker失敗、JobTracker失敗。

任務失敗

  • 子任務失敗。當map或者reduce子任務中的代碼拋出異常,JVM進程會在退出之前向服進程TaskTracker進程發送錯誤報告,TaskTracker會將此(任務嘗試)task attempt標記為failed狀態,釋放一個槽以便運行另外一個任務。
  • jvm失敗。JVM突然退出,即JVM錯誤,這時TaskTracker會註意到進程已經退出,標記為failed。

值得註意的是:
1)任務失敗有重試機制,重試次數map任務設置是mapred.map.max.attempts屬性控制,reduce是mapred.reduce.max.attempts屬性控制。
2)一些job可以完成任務總體的一部分就能夠接受,這個百分比由mapred.map.failures.precent和mapred.reduce.failures.precent參數控制。
3)任務嘗試(task attempt)是可以中止(killed)的。

TaskTracker失敗

作業運行期間,TaskTracker會通過心跳機制不斷與系統JobTracker通信,如果某個TaskTracker運行緩慢、失敗或者出現故障,TaskTracker就會停止或者很少向JobTracker發送心跳,JobTracker會註意到此TaskTracker發送心跳的情況,從而將此TaskTracker從等待任務調度的TaskTracker池中移除。

由於TaskTracker中包含有一定數量的map和reduce子任務,這個時候這些子任務怎麽處理呢?

1) 如果是map並且成功完成的話, JobTracker會安排此TaskTracker上一成功運行的map任務返回。
2) 如果是reduce並且成功的話,數據直接使用,因為reduce只要執行完了的就會把輸出寫到HDFS上。
3) 如果他們屬於未完成的作業的話,reduce階段無法獲取該TaskTracker上的本地map輸出文件,任何任務都需要重新調度。

另外,即使TaskTracker沒有失敗,如果其上的失敗子任務遠遠高於集群的平均失敗子任務數,也會被列入黑名單。可以通過重啟從JobTracker的黑名單移除。

###jobtracker失敗
jobtracker失敗應該說是最嚴重的一種失敗方式了,而且在Hadoop中存在單點故障的情況下是相當嚴重的,因為在這種情況下作業會最終失敗,盡管這種故障的概率極小。

Hadoop的MapReduce模型基本原理