MapReduce工作機制
執行流程
當你在MapReduce程式中呼叫了Job例項的Submit()或者waitForCompletion()方法,該程式將會被提交到Yarn中執行
其中的過程大部分被Hadoop隱藏起來了,對開發者來說是透明的
程式執行的過程涉及到個概念:
1.Client:提交程式的客戶端
2.ResourceManager:叢集中的資源分配管理
3.NodeManager:啟動和監管各自節點上的計算資源
4.ApplicationMaster:每個程式對應一個AM,負責程式的任務排程,本身也是執行在NM的Container中
5.HDFS:分散式檔案系統,和以上各個實體進行作業資料的共享
MapReduce作業在Yarn中的執行流程如圖所示:
這張圖和Hadoop核心元件之Yarn中提到的流程圖很相似
因為MapReduce作業也是屬於Yarn管理的一部分,只是這張圖針對MapReduce的執行更加細化了
作業提交
1.客戶端呼叫Job例項的Submit()或者waitForCompletion()方法提交作業
2.客戶端向RM請求分配一個Application ID
進行步驟2的時候,客戶端會對程式的輸出路徑進行檢查,如果沒有問題,進行作業輸入分片的計算
3.將作業執行所需要的資源拷貝到HDFS中,包括jar包、配置檔案和計算出來的輸入分片資訊等
4.呼叫RM的submitApplication方法將作業提交到RM
作業初始化
5a.RM收到submitApplication方法的呼叫之後會命令一個NM啟動一個Container
5b.在該NM的Container上啟動管理該作業的ApplicationMaster程序
6.AM對作業進行初始化操作,並將會接收作業的處理和完成情況報告
7.AM從HDFS中獲得輸入資料的分片資訊
在步驟7中,AM將會根據分片資訊確定要啟動的map任務數,reduce任務數則根據mapreduce.job.reduces屬性或者Job例項的setNumReduceTasks方法來決定
任務分配
8.AM為其每個map和reduce任務向RM請求計算資源
步驟8中,map任務的資源分配是要優先於reduce任務的,因為在reduce的排序階段開始之前,map任務必須全部完成
因此,reduce任務的資源請求只有當map任務完成了至少5%的時候才會進行
reduce任務是可以在叢集上的任意一個節點執行的,所以進行計算資源分配的時候RM不需要為reduce任務考慮分配哪個節點的資源給它
但是map任務不一樣,map任務有一個數據本地化的優化特性
資料本地優化是指map任務中處理的資料儲存在各個執行map本身的節點上,這能夠使得作業以最好的狀態執行,因為不需要跨界點消耗網路頻寬進行資料傳輸
移動計算而不移動資料
Yarn會優先給map任務分配本地資料,如果不存在,則在同一機架內的不同節點上搜尋資料,最差的情況是跨機架之間的資料傳輸
map每個split大小預設和hdfs的block塊大小一致的原因就是:
太大,會導致map讀取的資料可能跨越不同的節點,沒有了資料本地化的優勢
太小,會導致map數量過多,任務啟動和切換開銷太大,並行度過高
RM在map任務要處理的資料塊的那個節點上為map分配計算資源,如此一來,map任務就不需要跨網路進行資料傳輸了
因為AM中有輸入資料的分片資訊和要啟動的map任務的資訊,所以在為map任務請求資源的時候,RM會根據這些資訊為map分配計算資源
這裡的計算資源指的是map/reduce任務執行所需要的記憶體和CPU等,預設每個任務分配1024M的記憶體和一個CPU虛擬核心
可以通過修改以下選項來修改這個配置:
- mapreduce.map.memory.mb
- mapreduce.reduce.memory.mb
- mapreduce.map.cpu.vcores
- mapreduce.reduce.cpu.vcores
任務執行
9a.AM在RM指定的NM上啟動Container
9b.在Container上啟動任務(通過YarnChild進行來執行)
10.在真正執行任務之前,從HDFS從將任務執行需要的資源拷貝到本地,包括jar包、配置檔案資訊和分散式快取檔案等
11.執行map/reduce任務
作業完成
作業執行過程中,我們可以通過Yarn Web UI介面的AM頁面中檢視作業的執行資訊
當在客戶端呼叫waitForCompletion方法,每隔5秒鐘客戶端會檢查一次作業的執行情況
作業執行完畢之後將呼叫OutputCommitter方法對作業的資訊進行最後的清理工作
失敗處理
在實際場景中,使用者的程式碼總是會有bug、程式異常和節點失效等問題,Hadoop提供了失敗處理的機制儘可能的保證使用者的作業可以被順利完成
其中的過程需要考慮以下幾個實體:
Task的失敗
任務失敗標記
當由於使用者的程式碼導致map/reduce任務產生執行時異常時
在該任務退出之前,JVM會發送報告給AM,並且將錯誤資訊寫入使用者日誌中
AM將該任務標記為失敗,釋放Container資源
除了使用者程式碼之外,還有很多其他原因會導致任務失敗,如JVM的bug等
AM會間隔性的接收來自各個任務的彙報,當一段時間過後AM沒有接收到某個任務的報告
AM將會判斷該任務超時,將該任務標記為失敗並讓節點上的任務退出
任務超時的時間可以在作業中通過mapreduce.task.timeout選項來為每個作業單獨配置
設定為0表示無任務超時時間,此時任務執行再久也不會被標記為失敗,其資源也無法釋放,會導致叢集效率降低
失敗任務的重啟
當AM注意到一個任務失敗了之後,將會嘗試重新排程該任務
任務的重試不會在之前失敗了的節點上執行,並且失敗四次之後AM將不會繼續重啟該任務
這個值同樣是可以配置的:
- mapreduce.map.maxattempts
- mapreduce.reduce.maxattempts
預設的,一旦作業中有任何一個任務失敗超過4次,那麼整個作業將會標記為失敗
但是很多情況下,即使作業中的某些任務失敗了,其他任務的執行結果還是有價值的
所以我們可以配置一個作業中允許任務失敗的最大比例:
- mapreduce.map.failures.maxpercent
- mapreduce.reduce.failures.maxpercent
ApplicationMater的失敗
和任務失敗一樣,AM也可能由於各種原因(如網路問題或者硬體故障)失效,Yarn同樣會嘗試重啟AM
可以為每個作業單獨配置AM的嘗試重啟次數:mapreduce.am.max-attempts,預設值為2
需要注意的是,Yarn中限制了每個AM重啟的最大限制,預設也為2,如果為單個作業設定重啟次數為3,超過了這個上限也不會起到作用
所以還需要注意將Yarn中的上限一起提高:yarn.resourcemanager.am.nax-attempts
由於AM會通過心跳機制向RM資訊,當RM注意到AM失敗了之後,會在另外一個節點的Container上重啟,並將恢復已經執行的任務進度(心跳機制保留)
使得重啟的AM不用重頭執行任務,任務進度恢復預設是開啟的,可以通過yarn.app.mapreduce.am.job.recovery.enable為false來禁用
客戶端通過AM來獲得作業的執行情況,當AM失效的時候,客戶端會重新向RM請求新的AM地址來更新資訊
NodeManager的失敗
NM也通過心跳機制向RM彙報情況,當一個NM失效,或者執行緩慢的時候,RM將收不到該NM的心跳,或者心跳時間超時
此時RM會認為該NM失敗並移出可用NM管理池,心跳超時的時間通過yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms來配置
預設為10分鐘
在失敗的NM上執行的AM或者任務都會按照之前討論的機制進行恢復
Yarn對於NM的管理還有一個類似黑名單的功能,當該NM上的任務失敗次數超過3次之後(預設),該NM會被拉入黑名單
此時,即使該NM沒有失效,AM也不會在該NM上執行任務了
NM上的最大任務失敗次數可以通過mapreduce.job.maxtaskfailures.per.tracker來配置
ResourceManager的失敗
RM是Yarn中最終要的一個角色,沒有了RM叢集將無法使用
RM類似於HDFS中的Namenode,需要一種高可用的機制來保障
RM一開始就使用一種檢查點的機制來將叢集資訊持久化到磁碟中
當RM失效了之後,管理員可以手動啟動一個備用的RM讀取持久化的資訊
Shuffle和Sort
Shuffle是MapReduce中的核心概念,也是程式碼優化中很重要的一部分
理解Shuffle過程可以幫助你寫出更高階的MapReduce程式
Map Side
如上圖所示,map函式在產生資料的時候並不是直接寫入磁碟的,而是先寫入一個記憶體中的環形緩衝區
理由是在記憶體中對資料進行分割槽、分組排序等操作對比在磁碟上快很多
每個map都有一個緩衝區,預設大小為100M
當緩衝區的資料達到一個閾值的時候將會產生spill操作寫入到磁碟中,閾值預設為80%
緩衝區中的資料spill的時候,map產生的資料會源源不斷的寫入到緩衝區中空出來的空間
當緩衝區佔滿的時候,map任務將會阻塞直到緩衝區可以寫入資料
在map的資料寫入磁碟之前,在記憶體的緩衝區中會根據程式中設定的分割槽數對資料進行分割槽
並在每個分割槽內對資料進行分組和排序
如果設定了combiner函式,其將會在排序之後的資料上執行,以減少寫入到磁碟的資料量
緩衝區的每次spill操作都會在磁碟上產生一個spill檔案,所以一個map可能產生多個spill檔案
任務完成之前,這些spill檔案會被合併為一個已分割槽且已排序的輸出檔案
如果至少存在三個spill檔案(預設)且設定了combiner函式,那麼在合併spill檔案再次寫入到磁碟的時候會再次呼叫combiner
此時Map端的任務完成
Reduce Side
Map端的任務完成之後,Reduce端將會啟動會多個執行緒通過HTTP的方式到Map端獲取資料
為了優化reduce的執行時間,hadoop中等第一個map結束後,所有的reduce就開始嘗試從完成的map中下載該reduce對應的partition部分資料
在這個shuffle過程中,由於map的數量通常是很多個的,而每個map中又都有可能包含每個reduce所需要的資料
所以對於每個reduce來說,去各個map中拿資料也是並行的
Reduce端會啟動一個執行緒週期性的向AM請求Map端產生資料所在的位置(map任務和AM之間有心跳機制)
Map端產生的資料並不會在Reduce端獲取之後馬上刪除,因為reduce任務可能會因為失敗而重啟
Reduce端將Map端的資料拷貝過來之後也會放入一個記憶體緩衝區中,資料達到緩衝區的指定閾值之後h合併寫入到磁碟
隨著磁碟資料檔案的增多,和Map端一樣,Reduce端也會對溢位檔案進行合併
mapreduce.task.io.sort.factor可以控制Map和Reduce端的檔案數量達到多少個時進行合併
和Map端的合併不同,假設上述選項採用預設值10,共有40個溢位檔案
Map端最終會合並形成4個檔案
而Reduce端第一次只會合併4個檔案,隨後三次各合併10個檔案,還剩餘6個檔案
此時Reduce端中的檔案還有10個,最後一次合併為1個檔案輸入到reduce函式中
由此可以看出,Reduce端的合併目標是合併最小的檔案數量以滿足最後一次合併剛好達到設定的檔案合併係數
其目的是為了reduce讀取減少磁碟的開銷
如果指定了combiner函式則會在合併期間執行
隨後進入reduce函式的執行階段,併產生資料輸出到HDFS
由於大部分情況下,執行NM的節點往往還執行著Datanode,所以輸出資料的第一個副本通常是儲存在本地
效能調優
作者:@小黑