1. 程式人生 > >MapReduce程式的讀寫過程

MapReduce程式的讀寫過程






問題導讀

1、HDFS框架組成是什麼?

2、HDFS檔案的讀寫過程是什麼?

3、MapReduce框架組成是什麼?

4、MapReduce工作原理是什麼?

5、什麼是Shuffle階段和Sort階段?
MapReduce程式的工作過程-mapreduce工作過程

1.gif (12.87 KB, 下載次數: 4)



下載附件

 儲存到相簿



2015-10-8 20:58 上傳










HDFS採用master/slaver的主從架構,一個HDFS叢集包括一個NameNode節點(主節點)和多個DataNode節點(從節點),並提供應用程式的訪問介面。NameNode,DataNode和Client的解釋,如下所示:

  • NameNode負責檔案系統名字空間的管理與維護,同時負責客戶端檔案操作(比如開啟,關閉,重新命名檔案或目錄等)的控制及具體儲存任務的管理與分配(比如確定資料塊到具體DataNode節點的對映等);

  • DataNode負責處理檔案系統客戶端的讀寫請求,提供真實檔案資料的儲存服務;

  • Client是客戶端,一般指的是訪問HDFS介面的應用程式,或者HDFS的Web服務(就是讓使用者通過瀏覽器來檢視HDFS的執行狀況)等。





1. 檔案的讀取

Client與之互動的HDFS、NameNode、DataNode檔案的讀取流程,如下所示:
MapReduce程式的工作過程-mapreduce程式

2.jpg (37.74 KB, 下載次數: 3)



下載附件

 儲存到相簿



2015-10-8 20:58 上傳









  • Client向遠端的NameNode發起RPC請求;(1)

  • NameNode會返回檔案的部分或者全部Block列表,對於每個Block,NameNode都會返回該Block副本的DataNode地址;(2)

  • Client會選擇與其最接近的DataNode來讀取Block,如果Client本身就是DataNode,那麼將從本地直接讀取資料;(3)

  • 讀完當前Block後,關閉與當前的DataNode連線,併為讀取下一個Block尋找最近的DataNode;(4)

  • 讀完Block列表後,並且檔案讀取還沒有結束,Client會繼續向NameNode獲取下一批Block列表;(5)

  • 讀完一個Block都會進行Cheeksum驗證,如果讀取DataNode時出現錯誤,Client會通知NameNode,然後從該Block的另外一個最近鄰DataNode繼續讀取資料。Client讀取資料完畢之後,關閉資料流。(6)





2. 檔案的寫入

Client與之互動的HDFS、NameNode、DataNode檔案的寫入流程,如下所示:
MapReduce程式的工作過程-mapreduce過程

3.jpg (35.93 KB, 下載次數: 2)



下載附件

 儲存到相簿



2015-10-8 20:58 上傳









  • Client向遠端的NameNode發起RPC請求;(1)

  • NameNode便會檢查要建立的檔案是否已經存在,建立者是否有許可權進行操作等,如果滿足相關條件,就會建立檔案,否則會讓Client丟擲異常;(2)

  • 在Client開始寫入檔案的時候,開發庫(即DFSOutputStream)會將檔案切分成一個個的資料包,並寫入”資料佇列“,然後向NameNode申請新的Block,從而得到用來儲存複本(預設為3)的合適的DataNode列表,每個列表的大小根據NameNode中對replication的設定而定;(3)

  • 首先把一個數據包以流的方式寫入第一個DataNode,其次將其傳遞給在此管線中的下一個DataNode,然後直到最後一個DataNode,這種寫資料的方式呈流水線的形式;(假設複本為3,那麼管線由3個DataNode節點構成,即Pipeline of datanodes)(4)

  • 當最後一個DataNode完成之後,就會返回一個確認包,在管線裡傳遞至Client,開發庫(即DFSOutputStream)也維護著一個”確認佇列”,當成功收到DataNode發回的確認包後便會從“確認佇列”中刪除相應的包;(5)

  • 如果某個DataNode出現了故障,那麼DataNode就會從當前的管線中刪除,剩下的Block會繼續在餘下的DataNode中以管線的形式傳播,同時NameNode會再分配一個新的DataNode,以保持replication設定的數量。Client寫入資料完畢之後,關閉資料流。(6)





說明:HDFS預設Block的大小為64M,提供SequenceFile和MapFile二種型別的檔案。

二. MapReduce框架組成

MapReduce框架的主要組成部分和它們之間的相關關係,如下所示:
MapReduce程式的工作過程-mapreduce過程詳解

4.jpg (40.03 KB, 下載次數: 10)



下載附件

 儲存到相簿



2015-10-8 20:58 上傳









上述過程包含4個實體,各實體的功能,如下所示:

  • Client:提交的MapReduce作業,比如,寫的MR程式,或者CLI執行的命令等;

  • JobTracker:協調作業的執行,本質是一個管理者;

  • TaskTracker:執行作業劃分後的任務,本質就是一個執行者;

  • HDFS:用來在叢集間共享儲存的一種抽象檔案系統。





直觀來說,NameNode就是一個元資料倉庫,就像Windows中的登錄檔一樣。SecondaryNameNode可以看成NameNode的備份。DataNode可以看成是用來儲存作業劃分後的任務。在通常搭建的3臺Hadoop分散式叢集中,Master是NameNode,SecondaryNameNode,JobTracker,其它2臺Slaver都是TaskTracker,DataNode,並且TaskTracker都需要執行在HDFS的DataNode上面。

上述用到的類,或者程序的功能,如下所示:

  • Mapper和Reducer

    基於Hadoop的MapReduce應用程式最今本的組成部分包括:一個Mapper抽象類,一個Reducer抽象類,一個建立JobConf的執行程式。

  • JobTracker

    JobTracker屬於master,一般情況應該部署在單獨的機器上,它的功能就是接收Job,負責排程Job的每一個子任務Task執行在TaskTracker上,並且監控它們,如果發現有失敗的Task就重啟它即可。

  • TaskTracker

    TaskTracker是運行於多節點的slaver服務,它的功能是主動通過心跳與JobTracker進行通訊接收作業,並且負責執行每一個任務。

  • JobClient

    JobClient的功能是在Client提交作業後,把一些檔案上傳到HDFS,比如作業的jar包(包括應用程式以及配置引數)等,並且把路徑提交到JobTracker,然後由JobTracker建立每一個Task(即MapTask和ReduceTask)並將它們分別傳送到各個TaskTracker上去執行。

  • JobInProgress

    JobClient提交Job後,JobTracker會建立一個JobInProgress來跟蹤和排程這個Job,並且把它新增到Job佇列中。JobInProgress根據提交的Job Jar中定義的輸入資料集(已分解成FileSplit)建立對應的一批TaskInProgress1用於監控和排程Task。

  • TaksInProgress2

    JobTracker通過每一個TaskInProgress1來執行Task,這時會把Task物件(即MapTask和ReduceTask)序列化寫入相應的TaskTracker中去,TaskTracker會建立對應的TaskInProgress2用於監控和排程該MapTask和ReduceTask。

  • MapTask和ReduceTask

    Mapper根據Job Jar中定義的輸入資料<key1, value1>讀入,生成臨時的<key2, value2>,如果定義了Combiner,MapTask會在Mapper完成後呼叫該Combiner將相同Key的值做合併處理,目的是為了減少輸出結果。MapTask全部完成後交給ReduceTask程序呼叫Reducer處理,生成最終結果<key3, value3>。具體過程可以參見[4]。




三. MapReduce工作原理

整個MapReduce作業的工作工程,如下所示:
MapReduce程式的工作過程-mapreduce程式編寫

5.jpg (47.39 KB, 下載次數: 3)



下載附件

 儲存到相簿



2015-10-8 20:59 上傳









1. 作業的提交

JobClient的submitJob()方法實現的作業提交過程,如下所示:

  • 通過JobTracker的getNewJobId()請求一個新的作業ID;(2)

  • 檢查作業的輸出說明(比如沒有指定輸出目錄或輸出目錄已經存在,就丟擲異常);

  • 計算作業的輸入分片(當分片無法計算時,比如輸入路徑不存在等原因,就丟擲異常);

  • 將執行作業所需的資源(比如作業Jar檔案,配置檔案,計算所得的輸入分片等)複製到一個以作業ID命名的目錄中。(叢集中有多個副本可供TaskTracker訪問)(3)

  • 通過呼叫JobTracker的submitJob()方法告知作業準備執行。(4)





2. 作業的初始化

  • JobTracker接收到對其submitJob()方法的呼叫後,就會把這個呼叫放入一個內部佇列中,交由作業排程器(比如先進先出排程器,容量排程器,公平排程器等)進行排程;(5)

  • 初始化主要是建立一個表示正在執行作業的物件——封裝任務和記錄資訊,以便跟蹤任務的狀態和程序;(5)

  • 為了建立任務執行列表,作業排程器首先從HDFS中獲取JobClient已計算好的輸入分片資訊(6)。然後為每個分片建立一個MapTask,並且建立ReduceTask。(Task在此時被指定ID,請區分清楚Job的ID和Task的ID)。





3. 任務的分配

  • TaskTracker定期通過“心跳”與JobTracker進行通訊,主要是告知JobTracker自身是否還存活,以及是否已經準備好執行新的任務等;(7)

  • JobTracker在為TaskTracker選擇任務之前,必須先通過作業排程器選定任務所在的作業;

  • 對於MapTask和ReduceTask,TaskTracker有固定數量的任務槽(準確數量由TaskTracker核的數量和記憶體大小來決定)。JobTracker會先將TaskTracker的MapTask填滿,然後分配ReduceTask到TaskTracker;

  • 對於MapTrask,JobTracker通過會選取一個距離其輸入分片檔案最近的TaskTracker。對於ReduceTask,因為無法考慮資料的本地化,所以也沒有什麼標準來選擇哪個TaskTracker。





4. 任務的執行

  • TaskTracker分配到一個任務後,通過從HDFS把作業的Jar檔案複製到TaskTracker所在的檔案系統(Jar本地化用來啟動JVM),同時TaskTracker將應用程式所需要的全部檔案從分散式快取複製到本地磁碟;(8)

  • TaskTracker為任務新建一個本地工作目錄,並把Jar檔案中的內容解壓到這個資料夾中;

  • TaskTracker啟動一個新的JVM(9)來執行每個Task(包括MapTask和ReduceTask),這樣Client的MapReduce就不會影響TaskTracker守護程序(比如,導致崩潰或掛起等);

  • 子程序通過umbilical介面與父程序進行通訊,Task的子程序每隔幾秒便告知父程序它的進度,直到任務完成。





5. 程序和狀態的更新

一個作業和它的每個任務都有一個狀態資訊,包括作業或任務的執行狀態,Map和Reduce的進度,計數器值,狀態訊息或描述(可以由使用者程式碼來設定)。這些狀態資訊在作業期間不斷改變,它們是如何與Client通訊的呢?
MapReduce程式的工作過程-mapreduce工作過程

6.jpg (37.92 KB, 下載次數: 3)



下載附件

 儲存到相簿



2015-10-8 20:59 上傳









  • 任務在執行時,對其進度(即任務完成的百分比)保持追蹤。對於MapTask,任務進度是已處理輸入所佔的比例。對於ReduceTask,情況稍微有點複雜,但系統仍然會估計已處理Reduce輸入的比例;

  • 這些訊息通過一定的時間間隔由Child JVM—>TaskTracker—>JobTracker匯聚。JobTracker將產生一個表明所有執行作業及其任務狀態的全域性檢視。可以通過Web UI檢視。同時JobClient通過每秒查詢JobTracker來獲得最新狀態,並且輸出到控制檯上。





6. 作業的完成

當JobTracker收到作業最後一個任務已完成的通知後,便把作業的狀態設定為"成功"。然後,在JobClient查詢狀態時,便知道作業已成功完成,於是JobClient列印一條訊息告知使用者,最後從runJob()方法返回。

四. Shuffle階段和Sort階段

Shuffle階段是指從Map的輸出開始,包括系統執行排序以及傳送Map輸出到Reduce作為輸入的過程。Sort階段是指對Map端輸出的Key進行排序的過程。不同的Map可能輸出相同的Key,相同的Key必須傳送到同一個Reduce端處理。Shuffle階段可以分為Map端的Shuffle和Reduce端的Shuffle。Shuffle階段和Sort階段的工作過程,如下所示:
MapReduce程式的工作過程-mapreduce程式

7.jpg (61.79 KB, 下載次數: 3)



下載附件

 儲存到相簿



2015-10-8 20:59 上傳









如果說以上是從物理實體的角度來講解MapReduce的工作原理,那麼以上便是從邏輯實體的角度來講解MapReduce的工作原理,如下所示:

1. Map端的Shuffle

  • Map函式開始產生輸出時,並不是簡單地把資料寫到磁碟,因為頻繁的磁碟操作會導致效能嚴重下降。它的處理過程更復雜,資料首先寫到記憶體中的一個緩衝區,並做一些預排序,以提升效率;

  • 每個MapTask都有一個用來寫入輸出資料的迴圈記憶體緩衝區(預設大小為100MB),當緩衝區中的資料量達到一個特定閾值時(預設是80%)系統將會啟動一個後臺執行緒把緩衝區中的內容寫到磁碟(即spill階段)。在寫磁碟過程中,Map輸出繼續被寫到緩衝區,但如果在此期間緩衝區被填滿,那麼Map就會阻塞直到寫磁碟過程完成;

  • 在寫磁碟前,執行緒首先根據資料最終要傳遞到的Reducer把資料劃分成相應的分割槽(partition)。在每個分割槽中,後臺執行緒按Key進行排序(快速排序),如果有一個Combiner(即Mini Reducer)便會在排序後的輸出上執行;

  • 一旦記憶體緩衝區達到溢位寫的閾值,就會建立一個溢位寫檔案,因此在MapTask完成其最後一個輸出記錄後,便會有多個溢位寫檔案。在在MapTask完成前,溢位寫檔案被合併成一個索引檔案和資料檔案(多路歸併排序)(Sort階段);

  • 溢位寫檔案歸併完畢後,Map將刪除所有的臨時溢位寫檔案,並告知TaskTracker任務已完成,只要其中一個MapTask完成,ReduceTask就開始複製它的輸出(Copy階段);

  • Map的輸出檔案放置在執行MapTask的TaskTracker的本地磁碟上,它是執行ReduceTask的TaskTracker所需要的輸入資料,但是Reduce輸出不是這樣的,它一般寫到HDFS中(Reduce階段)。





2. Reduce端的Shuffle

  • Copy階段:Reduce程序啟動一些資料copy執行緒,通過HTTP方式請求MapTask所在的TaskTracker以獲取輸出檔案。

  • Merge階段:將Map端複製過來的資料先放入記憶體緩衝區中,Merge有3種形式,分別是記憶體到記憶體,記憶體到磁碟,磁碟到磁碟。預設情況下第一種形式不啟用,第二種Merge方式一直在執行(spill階段)直到結束,然後啟用第三種磁碟到磁碟的Merge方式生成最終的檔案。

  • Reduce階段:最終檔案可能存在於磁碟,也可能存在於記憶體中,但是預設情況下是位於磁碟中的。當Reduce的輸入檔案已定,整個Shuffle就結束了,然後就是Reduce執行,把結果放到HDFS中。




五. 其它

HDFS和MapReduce是Hadoop的基礎架構。除了上述講解之外,還有MapReduce容錯機制,任務JVM重用,作業排程器等都還沒有總結。徹底理解了MapReduce的工作原理之後就可以大量的MapReduce程式設計了,計劃將Hadoop自帶例項看完後,再研讀《Mahout實戰》,同步學習《Hadoop技術內幕:深入解析YARN架構設計與實現原理》,正式邁入Hadoop 2.x版本的大門。

參考文獻:

[1] 《Hadoop權威指南》(第二版)

[2] 《Hadoop應用開發技術詳解》

[3] Hadoop 0.18文件:http://hadoop.apache.org/docs/r1.0.4/cn/hdfs_design.html

[4] WordCount原始碼剖析:http://blog.csdn.net/recommender_system/article/details/42029311

[5] 外部排序技術之多路歸併排序:http://blog.chinaunix.net/uid-25324849-id-2182916.html