MapReduce部分:MapReduce基於yarn的工作原理
MapReduce基於yarn的工作原理:
我們通過提交jar包,進行MapReduce處理,那麼整個執行過程分為五個環節:
1、向client端提交MapReduce job.
2、隨後yarn的ResourceManager進行資源的分配.
3、由NodeManager進行載入與監控containers.
4、通過applicationMaster與ResourceManager進行資源的申請及狀態的互動,由NodeManagers進行MapReduce執行時job的管理.
5、通過hdfs進行job配置檔案、jar包的各節點分發。
Job 提交過程
job的提交通過呼叫submit()方法
1、向ResourceManager申請application ID,此ID為該MapReduce的jobId。
2、檢查output的路徑是否正確,是否已經被建立。
3、計算input的splits。
4、拷貝執行job 需要的jar包、配置檔案以及計算input的split 到各個節點。
5、在ResourceManager中呼叫submitAppliction()方法,執行job
Job 初始化過程
1、當resourceManager收到了submitApplication()方法的呼叫通知後,scheduler開始分配container,隨之ResouceManager傳送applicationMaster程序,告知每個nodeManager管理器。
2、由applicationMaster決定如何執行tasks,如果job資料量比較小,applicationMaster便選擇將tasks執行在一個JVM中。那麼如何判別這個job是大是小呢?當一個job的mappers數量小於10個,只有一個reducer或者讀取的檔案大小要小於一個HDFS block時,(可通過修改配置項mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces以及mapreduce.job.ubertask.maxbytes 進行調整)
3、在執行tasks之前,applicationMaster將會呼叫setupJob()方法
Task 任務分配
1、接下來applicationMaster向ResourceManager請求containers用於執行map與reduce的tasks(step 8),這裡map task的優先順序要高於reduce task,當所有的map tasks結束後,隨之進行sort(這裡是shuffle過程後面再說),最後進行reduce task的開始。(這裡有一點,當map tasks執行了百分之5%的時候,將會請求reduce,具體下面再總結)
2、執行tasks的是需要消耗記憶體與CPU資源的,預設情況下,map和reduce的task資源分配為1024MB與一個核,(可修改執行的最小與最大引數配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)
Task 任務執行
1、這時一個task已經被ResourceManager分配到一個container中,由applicationMaster告知nodemanager啟動container,這個task將會被一個主函式為YarnChild的java application執行,但在執行task之前,首先定位task需要的jar包、配置檔案以及載入在快取中的檔案。
2、YarnChild運行於一個專屬的JVM中,所以任何一個map或reduce任務出現問題,都不會影響整個nodemanager的crash或者hang。
3、每個task都可以在相同的JVM task中完成,隨之將完成的處理資料寫入臨時檔案中。
Mapreduce資料流
執行進度與狀態更新
1、MapReduce是一個較長執行時間的批處理過程,可以是一小時、幾小時甚至幾天,那麼Job的執行狀態監控就非常重要。每個job以及每個task都有一個包含job(running,successfully completed,failed)的狀態,以及value的計數器,狀態資訊及描述資訊(描述資訊一般都是在程式碼中加的列印資訊),那麼,這些資訊是如何與客戶端進行通訊的呢?
2、當一個task開始執行,它將會保持執行記錄,記錄task完成的比例,對於map的任務,將會記錄其執行的百分比,對於reduce來說可能複雜點,但系統依舊會估計reduce的完成比例。當一個map或reduce任務執行時,子程序會持續每三秒鐘與applicationMaster進行互動。
Job 完成
最終,applicationMaster會收到一個job完成的通知,隨後改變job的狀態為successful。最終,applicationMaster與task containers被清空。
Shuffle與Sort
從map到reduce的過程,被稱之為shuffle過程,MapReduce使到reduce的資料一定是經過key的排序的,那麼shuffle是如何運作的呢?
當map任務將資料output時,不僅僅是將結果輸出到磁碟,它是將其寫入記憶體緩衝區域,並進行一些預分類。
1、The Map Side
首先map任務的output過程是一個環狀的記憶體緩衝區,緩衝區的大小預設為100MB(可通過修改配置項mpareduce.task.io.sort.mb進行修改),當寫入記憶體的大小到達一定比例,預設為80%(可通過mapreduce.map.sort.spill.percent配置項修改),便開始寫入磁碟。
在寫入磁碟之前,執行緒將會指定資料寫入與reduce相應的patitions中,最終傳送給reduce.在每個partition中,後臺執行緒將會在記憶體中進行Key的排序,(如果程式碼中有combiner方法,則會在output時就進行sort排序,這裡,如果只有少於3個寫入磁碟的檔案,combiner將會在outputfile前啟動,如果只有一個或兩個,那麼將不會呼叫)
這裡將map輸出的結果進行壓縮會大大減少磁碟IO與網路傳輸的開銷(配置引數mapreduce.map .output.compress 設定為true,如果使用第三方壓縮jar,可通過mapreduce.map.output.compress.codec進行設定)
隨後這些paritions輸出檔案將會通過HTTP傳送至reducers,傳送的最大啟動執行緒通過mapreduce.shuffle.max.threads進行配置。
2、The Reduce Side
首先上面每個節點的map都將結果寫入了本地磁碟中,現在reduce需要將map的結果通過叢集拉取過來,這裡要注意的是,需要等到所有map任務結束後,reduce才會對map的結果進行拷貝,由於reduce函式有少數幾個複製執行緒,以至於它可以同時拉取多個map的輸出結果。預設的為5個執行緒(可通過修改配置mapreduce.reduce.shuffle.parallelcopies來修改其個數)
這裡有個問題,那麼reducers怎麼知道從哪些機器拉取資料呢?
當所有map的任務結束後,applicationMaster通過心跳機制(heartbeat mechanism),由它知道mapping的輸出結果與機器host,所以reducer會定時的通過一個執行緒訪問applicationmaster請求map的輸出結果。
Map的結果將會被拷貝到reduce task的JVM的記憶體中(記憶體大小可在mapreduce.reduce.shuffle.input.buffer.percent中設定)如果不夠用,則會寫入磁碟。當記憶體緩衝區的大小到達一定比例時(可通過mapreduce.reduce.shuffle.merge.percent設定)或map的輸出結果檔案過多時(可通過配置mapreduce.reduce.merge.inmen.threshold),將會除法合併(merged)隨之寫入磁碟。
這時要注意,所有的map結果這時都是被壓縮過的,需要先在記憶體中進行解壓縮,以便後續合併它們。(合併最終檔案的數量可通過mapreduce.task.io.sort.factor進行配置) 最終reduce進行運算進行輸出。