MapReduce 圖解流程超詳細解答(1)-【map階段】
- Map Phase:若干 Map Tasks 被執行
- Reduce Phase: 若干Reduce Tasks 被執行
reduce可能會在map階段結束之前開始執行,因此上面顯示的有重疊的地方。
現在我們集中考察map相,一個關鍵的問題是一個應用需要多少map任務去執行現在的這個job
使用者給了我們什麼?
我們退回到之前的一步,當一個使用者提交一個應用的時候,若干資訊被提供給了YARN ,分別是:
- 一個配置:這可以是一部分的,因為一些引數不需要使用者特別指定,可以有自己的預設值。
- 一個jar檔案,含有一個map,一個combiner,一個reduce
- 一個輸入和輸出資訊 輸入目錄 是不是在hdfs上,有多少檔案呢?輸出的時候,我們儲存在哪裡
The number of files inside the input directory is used for deciding the number of Map Tasks of a job.
那麼,輸入的目錄中檔案的數量決定多少個map會被執行起來
多少個map任務?
應用針對每一個分片執行一個map,一般而言,對於每一個輸入的檔案會有一個map split。如果輸入檔案太大,超過了hdfs塊的大小(64M)那麼對於同一個輸入檔案我們會有多餘2個的map執行起來。下面是FileInputFormat
class 的getSplits()的虛擬碼:
num_splits = 0 for each input file f: remaining = f.length while remaining / split_size > split_slope: num_splits += 1 remaining -= split_size
where:
split_slope = 1.1 分割斜率
split_size =~ dfs.blocksize 分割大小約等於hdfs塊大小
在mapreduce2.0以上版本mapreduce.job.maps
屬性會被忽略
MapTask Launch
啟動MapTask
mapreduce應用會向資源管理器請求這個job需要的容器,一個maptask容器請求每一個maptask。一個容器對每一個maptask的請求會嘗試利用map分片的本地性,應用會請求一下資料:
- 請求map split 和container在同一個節點管理器的container
- 如果沒有,請求一個map split 和container在
- 否則請求任意節點管理器上的container
這只是一小部分資源任務。資源任務器在資源任務器既定目標和指定目標衝突的時候,可以忽略本地性。當一個容器被分配一個任務,map就馬上啟動了。
Map階段:一個執行階段的例子
map 相的一個簡要圖:
- 有兩個節點管理器:每一個2GB的記憶體,每一個map需要1GB我們可以並行執行兩個容器。這是最好的情況,而資源任務器的決策可能會有所不同
- 叢集沒有其他的YARN任務執行
- 我們的job有8個map分片,也就是在輸入資料夾中有7個檔案,只有一個是大於hdfs塊大小的,需要被拆分為兩個檔案。
map任務的執行時間線
現在我們可以聚焦單個的map task:這是單個map的執行時間線:
- 初始相:我們設定map任務
- 執行相:map分片裡面的每一個鍵值對進行map()函式運算
- 溢寫相:map的輸出儲存在環形記憶體緩衝區,當緩衝區滿80%(一般80%),啟動溢寫相,將緩衝的資料寫出到磁碟。
- 洗牌相:在溢寫相的結尾,我們合併多有的輸出,並且打包他們以便進行reduce相處理。
map任務:初始化
在初始化階段,我們:
- 建立一個上下文物件(context)(TaskAttemptContext)
- 建立使用者map.class例項
- 設定輸入
- 設定輸出
- 建立mapper的上下文(
MapContext.class
,Mapper.Context.class)
- 初始化輸入也就是:
- 建立
SplitLineReader.class 分片行閱讀器
- 建立HdfsDataInputStream.class hdfs資料輸入流
Map任務:執行階段
執行階段通過Mapper
class.的run()方法:
使用者可以重寫這個方法,但是預設的時候通常會呼叫setup而啟動這個程式。這個函式預設並不做什麼有用的 事情,但是可以被使用者覆蓋重寫以便於設定任務(例如初始化類的變數),當設定完成之後,分片的每一個鍵值對會激發map()方法。因此map()接收到一個鍵,一個值,以及一個上下文context。使用這個上下文物件,一個map就會儲存其輸出到快取中。
請注意,map分片是一個快一個塊擷取的(例如64kb),每一個快分割成為若干鍵值對的資料( SplitLineReader.class乾的好事
),這是在Mapper.Context.nextKeyValue內部完成的。當map分片被全部處理之後,run()會呼叫clean()方法。預設的,沒有什麼會被執行,除非使用者重寫覆蓋他。
map任務:溢寫階段
正如我們在執行階段看到的一樣,map會使用Mapper.Context.write()將map函式的輸出溢寫到記憶體中的環形緩衝區(
MapTask.MapOutputBuffer
)。緩衝區的大小是固定的,通過mapreduce.task.io.sort.mb
(default:
100MB)指定。
任何時候當這個緩衝區將要充滿的時候(mapreduce.map.
sort.spill.percent
: 預設80% ),溢寫將會被執行(這是一個並行過程,使用的是單獨的執行緒,緩衝池還可以繼續被寫入)。如果溢寫執行緒太慢,而緩衝區又忙了的話,map()就會暫停執行而等待。
溢寫執行緒執行下面的動作:
- 建立一個溢寫記錄
SpillRecord
和一個FSOutputStream
檔案輸出流(本地檔案系統) - 記憶體內排序緩衝中的塊:輸出的資料會使用快排演算法按照partitionIdx, key排序
- 排序之後的輸出會分割成為分割槽:每一個分割槽對應一個reduce
- 分割槽序列化寫到本地檔案