1. 程式人生 > >MapReduce部分:MapReduce基於yarn的工作原理

MapReduce部分:MapReduce基於yarn的工作原理

MapReduce基於yarn的工作原理:

我們通過提交jar包,進行MapReduce處理,那麼整個執行過程分為五個環節:

  1、向client端提交MapReduce job.

  2、隨後yarn的ResourceManager進行資源的分配.

  3、由NodeManager進行載入與監控containers.

  4、通過applicationMaster與ResourceManager進行資源的申請及狀態的互動,由NodeManagers進行MapReduce執行時job的管理.

  5、通過hdfs進行job配置檔案、jar包的各節點分發。

Job 提交過程

  job的提交通過呼叫submit()方法

建立一個JobSubmitter例項,並呼叫submitJobInternal()方法。整個job的執行過程如下:

  1、向ResourceManager申請application ID,此ID為該MapReduce的jobId。

  2、檢查output的路徑是否正確,是否已經被建立。

  3、計算input的splits。

  4、拷貝執行job 需要的jar包、配置檔案以及計算input的split 到各個節點。

  5、在ResourceManager中呼叫submitAppliction()方法,執行job

Job 初始化過程

  1、當resourceManager收到了submitApplication()方法的呼叫通知後,scheduler開始分配container,隨之ResouceManager傳送applicationMaster程序,告知每個nodeManager管理器。

  2、由applicationMaster決定如何執行tasks,如果job資料量比較小,applicationMaster便選擇將tasks執行在一個JVM中。那麼如何判別這個job是大是小呢?當一個job的mappers數量小於10個只有一個reducer或者讀取的檔案大小要小於一個HDFS block時,(可通過修改配置項mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces以及mapreduce.job.ubertask.maxbytes 進行調整)

  3、在執行tasks之前,applicationMaster將會呼叫setupJob()方法

,隨之建立output的輸出路徑(這就能夠解釋,不管你的mapreduce一開始是否報錯,輸出路徑都會建立)

Task 任務分配

  1、接下來applicationMaster向ResourceManager請求containers用於執行map與reduce的tasks(step 8),這裡map task的優先順序要高於reduce task,當所有的map tasks結束後,隨之進行sort(這裡是shuffle過程後面再說),最後進行reduce task的開始。(這裡有一點,當map tasks執行了百分之5%的時候,將會請求reduce,具體下面再總結)

  2、執行tasks的是需要消耗記憶體與CPU資源的,預設情況下,map和reduce的task資源分配為1024MB與一個核,(可修改執行的最小與最大引數配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)

Task 任務執行

  1、這時一個task已經被ResourceManager分配到一個container中,由applicationMaster告知nodemanager啟動container,這個task將會被一個主函式為YarnChild的java application執行,但在執行task之前,首先定位task需要的jar包、配置檔案以及載入在快取中的檔案

  2、YarnChild運行於一個專屬的JVM中,所以任何一個map或reduce任務出現問題,都不會影響整個nodemanager的crash或者hang

  3、每個task都可以在相同的JVM task中完成,隨之將完成的處理資料寫入臨時檔案中。

Mapreduce資料流

執行進度與狀態更新

  1、MapReduce是一個較長執行時間的批處理過程,可以是一小時、幾小時甚至幾天,那麼Job的執行狀態監控就非常重要。每個job以及每個task都有一個包含job(running,successfully completed,failed)的狀態,以及value的計數器,狀態資訊及描述資訊(描述資訊一般都是在程式碼中加的列印資訊),那麼,這些資訊是如何與客戶端進行通訊的呢?

  2、當一個task開始執行,它將會保持執行記錄,記錄task完成的比例,對於map的任務,將會記錄其執行的百分比,對於reduce來說可能複雜點,但系統依舊會估計reduce的完成比例。當一個map或reduce任務執行時,子程序會持續每三秒鐘與applicationMaster進行互動

Job 完成

   最終,applicationMaster會收到一個job完成的通知,隨後改變job的狀態為successful。最終,applicationMaster與task containers被清空。

Shuffle與Sort

  從map到reduce的過程,被稱之為shuffle過程,MapReduce使到reduce的資料一定是經過key的排序的,那麼shuffle是如何運作的呢?

  當map任務將資料output時,不僅僅是將結果輸出到磁碟,它是將其寫入記憶體緩衝區域,並進行一些預分類

  

  1、The Map Side

  首先map任務的output過程是一個環狀的記憶體緩衝區,緩衝區的大小預設為100MB(可通過修改配置項mpareduce.task.io.sort.mb進行修改),當寫入記憶體的大小到達一定比例,預設為80%(可通過mapreduce.map.sort.spill.percent配置項修改),便開始寫入磁碟。

  在寫入磁碟之前,執行緒將會指定資料寫入與reduce相應的patitions中,最終傳送給reduce.在每個partition中,後臺執行緒將會在記憶體中進行Key的排序,(如果程式碼中有combiner方法,則會在output時就進行sort排序,這裡,如果只有少於3個寫入磁碟的檔案,combiner將會在outputfile前啟動,如果只有一個或兩個,那麼將不會呼叫)

  這裡將map輸出的結果進行壓縮會大大減少磁碟IO與網路傳輸的開銷(配置引數mapreduce.map .output.compress 設定為true,如果使用第三方壓縮jar,可通過mapreduce.map.output.compress.codec進行設定)

   隨後這些paritions輸出檔案將會通過HTTP傳送至reducers,傳送的最大啟動執行緒通過mapreduce.shuffle.max.threads進行配置。

  2、The Reduce Side

  首先上面每個節點的map都將結果寫入了本地磁碟中,現在reduce需要將map的結果通過叢集拉取過來,這裡要注意的是,需要等到所有map任務結束後,reduce才會對map的結果進行拷貝,由於reduce函式有少數幾個複製執行緒,以至於它可以同時拉取多個map的輸出結果。預設的為5個執行緒(可通過修改配置mapreduce.reduce.shuffle.parallelcopies來修改其個數)

  這裡有個問題,那麼reducers怎麼知道從哪些機器拉取資料呢? 

  當所有map的任務結束後,applicationMaster通過心跳機制(heartbeat mechanism),由它知道mapping的輸出結果與機器host,所以reducer會定時的通過一個執行緒訪問applicationmaster請求map的輸出結果

  Map的結果將會被拷貝到reduce task的JVM的記憶體中(記憶體大小可在mapreduce.reduce.shuffle.input.buffer.percent中設定)如果不夠用,則會寫入磁碟。當記憶體緩衝區的大小到達一定比例時(可通過mapreduce.reduce.shuffle.merge.percent設定)或map的輸出結果檔案過多時(可通過配置mapreduce.reduce.merge.inmen.threshold),將會除法合併(merged)隨之寫入磁碟。

  這時要注意,所有的map結果這時都是被壓縮過的,需要先在記憶體中進行解壓縮,以便後續合併它們。(合併最終檔案的數量可通過mapreduce.task.io.sort.factor進行配置) 最終reduce進行運算進行輸出。