1. 程式人生 > >MapReduce與Yarn

MapReduce與Yarn

dfs apr 合並操作 相同 程序 combiner 計算 順序 大量

技術分享圖片

框架中有兩類節點(也可以說是進程),MasterWorker,當用戶提交一個計算作業(Job)的時候,會啟動Job唯一對應的Master進程,Master進程負責整個Job的調度,包括分配worker的角色(map或者reduce)、worker計算的數據,以及向用戶返回結果等等。而Worker負責的具體計算稱之為task,在MapReduce框架下,worker按照計算的階段又分為map worker和reduce worker,worker在master獲取計算任務,然後在文件系統讀取數據進行運算,並將結果寫入到臨時文件或者持久化文件系統。

  一個Job的流程是這樣的

  1. MapReduce將待運算的數據分為M份,每一份的大小為16M或者64M(這個跟默認使用的分布式文件系統GFS有關),每一份數據稱之為一個split
  2. 啟動M個map worker,讀取相應的split,然後調用用戶的Map function,對數據進行運算
  3. map worker周期性將計算結果(稱之為中間結果)寫入到R份本地文件中的其中一份,R是reduce worker的數量,具體寫入哪一個臨時文件 規則由Partitioning function指定
  4. 當一個map worker計算任務完成的時候,將R份中間結果的位置通知master,master通知對應的reduce worker
  5. reduce worker根據中間結果的位置,通過rpc從map worker上獲取與自己對應的中間結果,進行計算,並將計算結果寫入到持久化分布式文件系統,
  6. 當所有map reduce worker的計算任務結束之後,通知用戶計算結果

  當然,一個mapreduce的結果並不一定直接給用戶,很有可能是一個鏈式(chain)計算,即將一個mapreduce的輸出當做另一個mapreduce的輸入

MapTask執行的Timeline

技術分享圖片

這是Map Task任務執行時間線:

  • 初始化(INIT)階段:初始化Map Task(默認是什麽都沒有。。)
  • 執行(EXECUTION)階段: 對於每個 (key, value)執行map()函數
  • 排序(SPILLING)階段:map輸出會暫存到內存當中排序,當緩存達到一定程度時會寫到磁盤上,並刪除內存裏的數據
  • SHUFFLE 階段:排序結束後,會合並所有map輸出,並分區傳輸給reduce。

Shuffle階段

技術分享圖片

shuffle階段主要是做數據的排序和合並操作,然後把數據存到本地文件系統上,等待Reduce來獲取數據。等到所有的MapTask產出的數據傳輸都Reduce機器上,並對數據進行排序以後才能算是Shuffle過程的結束。也就說從Map函數出來之後到Reduce函數之前的所有數據操作都叫Shuffle操作,包括排序、合並、分區、傳輸等。

技術分享圖片

技術分享圖片

技術分享圖片

很好的設計值得學習借鑒。

  data locality,即計算離需要的數據存儲越近越好,以盡量避免網絡傳輸

  fault tolerance,分布式系統中,節點故障是常態,運行環境需要對用戶透明地監控、處理故障。由於mapreduce編程模型的線性無狀態特性,對於某一個worker的故障,只需將計算任務給其他worker負責就行

  backup task,按照木桶定律,即一只水桶能裝多少水取決於它最短的那塊木板,在mapreduce中,運行最為緩慢的worker會成為整個Job的短板。運行環境需要監控到異常緩慢的worker,主動將其上的task重新調度到其他worker上,以便在合理的時間結束整個Job,提高系統的吞吐。

  partitioning function & combiner function,這是用戶可以提供的另外兩個算子,實時上也是非常有用的。map worker的中間結果,通過partitioning function分發到R(R為reducer的數目)位本地文件,默認為“hash(key) mod R”。而Combiner function是對每一個map worker的結果先進行一次合並(partial merge),然後再寫入本地文件,以減少數據傳輸,較少reduce worker的計算任務。

不足或者沒有考慮到的點

  第一:master是單點,故障恢復依賴於周期性的checkpoint,不保證可靠性,因此發生故障的時候會通知用戶,用戶自行決定是否重新計算。

  第二:沒有提到作業(Job)的調度策略,運行時環境肯定是有大量的Job並發的,因此多樣且高效的調度策略是非常重要的,比如按優先級、按群組

  第三:並沒有提到資源(CPU、內存、網絡)的調度,或者說並不區分作業調度與資源調度。

  第四:沒有提到資源隔離與安全性,大量Job並發的時候,如何保證單個Job不占用過多的資源,如何保證用戶的程序對系統而言是安全的,在論文中並沒有提及

  第五:計算數據來源於文件系統,效率不是很高,不過本來就是用於離線任務,這個也不是什麽大問題

Yarn與MapReduce交互(MapReduce2.0)的架構如下:

技術分享圖片

1.客戶端生成相應的文件後,連接resourceManager請求提交一個application,resourceManager(RM)給客戶端返回一個地址,供客戶端提交文件,Client提交文件到指定的HDFS地址上, Client向RM申請運行該任務的MRAppMaster(其實第一步主要就是這個),MR將Client的請求的任務初始化為一個task放入到其維護的隊列中. 2.當MR的調度策略調度到該task時,向一個NodeManager分配該task(YARN共有三種調度策略,FIFO Scheduler,Capacity Scheduler,Fair Scheduler); NodeManager根據task描述創建一個容器Container,來運行該task,此次運行的應該是MRAppMaster。需要先從HDFS中將上傳的Job信息下載到容器中.(上次Mapreduce流程中說到,MRAppMaster是整個MR過程的控制中心) 3.MRAppMaster向ResourceManager (ApplicationsManager)反饋自身狀態信息,比如心跳等。 4.MRAppMaster根據Job中的信息向ResourceManager (ResourceSchedule)申請運行MapTask、ReduceTask的運行資源,ResourceManager (ResourceSchedule)告訴MRAppMaster去哪個NodeManager啟動MapTask、ReduceTask(Task也在Container容器中) 5.MRAppMaster通知各個NodeManager啟動MapTask、ReduceTask(Task也在Container容器中) 6.各個NodeManager分配資源容器Container,啟動MapTask、ReduceTask 7.Task運算結果返回給MRAppMaster 8.MRAppMaster向RM申請註銷自己,進行資源回收。

在Yarn中,有以下組件

  ResourceManager:資源管理器,接收用戶的請求,負載應用(application)的調度管理,啟動應用對應的ApplicationMaseter,並為每一個應用分配所需的資源

  NodeManager:每臺機器上的框架代理(framework agent),在每一個計算機節點上都有一個,用於本機上的Container,監控機器的資源使用情況,並向ResourceManager匯報

  ApplicationMaseter(圖中所有為App Mstr),每一個應用都有自己唯一的ApplicationMaseter,用於管理應用的生命周期,向ResourceMananger申請資源,監控任務對應的container

  Containner:具體任務task的計算單元,是一組資源的抽象,可用於以後實現資源的隔離。它封裝了某個節點上的多維度資源,如內存、CPU、磁盤、網絡等,當AM向RM申請資源時,RM為AM返回的資源便是用Container表示的。YARN會為每個任務分配一個Container,且該任務只能使用該Container中描述的資源。需要註意的是,Container不同於MRv1中的slot,它是一個動態資源劃分單位,是根據應用程序的需求動態生成的。目前為止,YARN僅支持CPU和內存兩種資源,且使用了輕量級資源隔離機制Cgroups進行資源隔離。

功能: 對task環境的抽象、描述一系列信息、任務運行資源的集合(cpu、內存、io等)、任務運行環境

  其中,ResourceManager包含兩個重要的組件,scheduler和ApplicationManager。scheduler負責為各種應用分配資源,支持各種調度算法,如 CapacityScheduler、 FairScheduler。ApplicationManager負責接收用於的請求,啟動應用對應的ApplicationMaseter。

YARN:應用程序啟動

在YARN,至少有三個演員:

  • 任務提交(客戶端)
  • 所述資源管理器(主設備)
  • 所述節點管理器(從設備)

應用程序啟動過程如下:

  1. 客戶端將應用程序提交給資源管理器
  2. 資源管理器分配容器
  3. 資源管理器聯系相關的節點管理器
  4. 節點管理器啟動容器
  5. Container執行Application Master

技術分享圖片

ResourceSchedule調度器的選擇

在YARN中有三種調度器可以選擇:FIFO Scheduler,Capacity Scheduler,Fair Scheduler。
FIFO Scheduler把應用按提交的順序排成一個隊列,是一個先進先出隊列,在進行資源分配的時候,先給隊列中最頭部的應用進行分配資源,待最頭部的應用需求滿足後再給下一個分配,以此類推。FIFO Scheduler是最簡單也是最容易理解的調度器,它不需要任何配置,但不適用於共享集群中。大的應用可能會占用所有集群資源,從而導致其它應用被阻塞。在共享集群中,更適合采用Capacity Scheduler或Fair Scheduler,這兩個調度器都允許大任務和小任務在提交的同時獲得一定的系統資源。

下面“YARN調度器對比圖”展示了這幾個調度器的區別,從圖中可以看出,在FIFO調度器中,小任務會被大任務阻塞。而對於Capacity調度器,設置了一個專門的隊列用來運行小任務,但是為小任務專門設置一個隊列會預先占用一定的集群資源,這就導致大任務的執行時間會落後於使用FIFO調度器時的時間。在Fair調度器中,不需要預先保留一定的系統資源,Fair調度器會為所有運行的job動態的調整系統資源。如下圖所示,當第一個大job提交時,只有這一個job在運行,此時它獲得了所有集群資源;當第二個小任務提交後,Fair調度器會分配一半資源給這個小任務,讓這兩個任務公平的共享集群資源。
需要註意的是,在下圖Fair調度器中,從第二個任務提交到獲得資源會有一定的延遲,因為它需要等待第一個任務釋放占用的Container。小任務執行完成之後也會釋放自己占用的資源,大任務又獲得了全部的系統資源。最終的效果就是Fair調度器即得到了高的資源利用率又能保證小任務及時完成。

Yarn調度器對比圖:

技術分享圖片

YARN的資源管理

1、資源調度和隔離是yarn作為一個資源管理系統,最重要且最基礎的兩個功能。資源調度由resourcemanager完成,而資源隔離由各個nodemanager實現。

2、Resourcemanager將某個nodemanager上資源分配給任務(這就是所謂的“資源調度”)後,nodemanager需按照要求為任務提供相應的資源,甚至保證這些資源應具有獨占性,為任務運行提供基礎和保證,這就是所謂的資源隔離。

3、當談及到資源時,我們通常指內存、cpu、io三種資源。Hadoop yarn目前為止僅支持cpu和內存兩種資源管理和調度。

4、內存資源多少決定任務的生死,如果內存不夠,任務可能運行失敗;相比之下,cpu資源則不同,它只會決定任務的快慢,不會對任務的生死產生影響。

Yarn的內存管理:

yarn允許用戶配置每個節點上可用的物理內存資源,註意,這裏是“可用的”,因為一個節點上內存會被若幹個服務貢享,比如一部分給了yarn,一部分給了hdfs,一部分給了hbase等,yarn配置的只是自己可用的,配置參數如下:

yarn.nodemanager.resource.memory-mb

表示該節點上yarn可以使用的物理內存總量,默認是8192m,註意,如果你的節點內存資源不夠8g,則需要調減這個值,yarn不會智能的探測節點物理內存總量。

yarn.nodemanager.vmem-pmem-ratio

任務使用1m物理內存最多可以使用虛擬內存量,默認是2.1

yarn.nodemanager.pmem-check-enabled

是否啟用一個線程檢查每個任務證使用的物理內存量,如果任務超出了分配值,則直接將其kill,默認是true。

yarn.nodemanager.vmem-check-enabled

是否啟用一個線程檢查每個任務證使用的虛擬內存量,如果任務超出了分配值,則直接將其kill,默認是true。

yarn.scheduler.minimum-allocation-mb

單個任務可以使用最小物理內存量,默認1024m,如果一個任務申請物理內存量少於該值,則該對應值改為這個數。

yarn.scheduler.maximum-allocation-mb

單個任務可以申請的最多的內存量,默認8192m

Yarn cpu管理:

目前cpu被劃分為虛擬cpu,這裏的虛擬cpu是yarn自己引入的概念,初衷是考慮到不同節點cpu性能可能不同,每個cpu具有計算能力也是不一樣的,比如,某個物理cpu計算能力可能是另外一個物理cpu的2倍,這時候,你可以通過為第一個物理cpu多配置幾個虛擬cpu彌補這種差異。用戶提交作業時,可以指定每個任務需要的虛擬cpu個數。在yarn中,cpu相關配置參數如下:

yarn.nodemanager.resource.cpu-vcores

表示該節點上yarn可使用的虛擬cpu個數,默認是8個,註意,目前推薦將該值為與物理cpu核數相同。如果你的節點cpu合數不夠8個,則需要調減小這個值,而yarn不會智能的探測節點物理cpu總數。

yarn.scheduler.minimum-allocation-vcores

單個任務可申請最小cpu個數,默認1,如果一個任務申請的cpu個數少於該數,則該對應值被修改為這個數

yarn.scheduler.maximum-allocation-vcores

單個任務可以申請最多虛擬cpu個數,默認是32.

MapReduce與Yarn