MapReduce 的核心流程
下文中新舊的意思分別代表 Hadoop 0.20 前後。原因是 MapReduce 在這個版本進行了一次大改,主要的特點就是劃分了新舊兩個包名。新版的特點是使用了抽象類代替一些可擴充套件的介面,以及增加了 Context 的概念。比如說,MapContext
中就封裝了獲取切片、讀取 Record 等功能,而MapContextImpl
就是對 reader 和 writer 邏輯的封裝。
有時候,我們會不小心配置錯誤,比如把mapreduce.job.combine.class
配置到了mapred.combiner.class
,就可能導致啟用了舊的模組。
在org.apache.hadoop.mapred.TaskStatus.Phase
STARTING,MAP,SHUFFLE,SORT,REDUCE,CLEANUP
。當然我們在理解的時候可能加入一些自己定義的階段(根據動作),比如 Split。
Split
整個分片在客戶端進行。Split 其實是是邏輯上的分片,但其大小依賴於底層的檔案塊。這個公式搞 Hadoop 的人都很熟悉:
protected long computeSplitSize(long goalSize,long minSize,long blockSize) {
return Math.max(minSize,Math.min(goalSize,blockSize));
}
複製程式碼
它其實就是保證了。它是FileInputFormat
裡專門設定的規則,且這個並不是固定的,而是針對每個檔案(因為 MapReduce 支援傳入路徑,而且是多個路徑)。這裡你就可以看到一個可以調優的點了:顯然這裡需要進行和 namenode 的通訊,這樣就需要允許設定mapreduce.input.fileinputformat.list-status.num-threads
增加遍歷檔案資訊時的執行緒數,這很合理(當然,略有一些奇怪的地方是這個配置實際上影響的是客戶端)。
Split的核心類是InputFormat
。這個介面必須實現getSplits
和getRecordReader
方法,這對應著將輸入的檔案劃分為分片並對映到 Map 的輸入。一般來說,我們預設使用的是TextInputFormat
LineRecordReader
讀取模式和FileInputFormat
的按塊切分。其他的模式也很重要,比如CombineFileInputFormat
(可以讀取若干小檔案,這個和 SequenceFile 合在一起不同,只是打包了元資料),比如KeyValueTextInputFormat
可以讀取鍵值對,比如NLineInputFormat
允許指定切分的行數(其實我覺得應該實現在TextInputFormat
裡)。LineRecordReader
還有一個細節是它實現了對壓縮檔案的自解釋。
這裡還是有一些值得細想的地方。比如,當獲取了檔案的分塊資訊之後,如何對任務的劃分進行優化(本地化)?如果是本地檔案,如何寫入 HDFS 並補全元資訊?
顯然,本地化已經涉及到作業排程,所以這裡的邏輯應該在 AM 處理。因此這裡JobSubmitter
會將資料進行序列化(具體可以參考writeJobSplitMetaInfo
),之後在JobImpl.InitTransition
的createSplits
方法可以看到它讀取了一組TaskSplitMetaInfo
資訊。JobImpl
中只會建立 Map 和 Reduce 任務,具體的排程就要參考排程器的實現了。
在listStatus
呼叫getFileSystem
後,會根據 schema 得到對應的FileSystem
實現類,本地就是LocalFileSystem
。這裡並沒有做特殊的處理,而是將這個位置也作為元資訊傳遞了。
Map
Mapper
裡的MapContext
會從InputFormat
裡取 RecordReader 作為記錄讀取的方法。這裡的實現比較簡單而且不太重要,重點是理解 Mapper 的run
方法:
public void run(Context context) throws IOException,InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(),context.getCurrentValue(),context);
}
} finally {
cleanup(context);
}
}
複製程式碼
其中setup
和cleanup
只呼叫一次,都是平時 MR 常見的回撥點。
我們知道在 Map 中會將結果分割槽寫入不同檔案。這裡提供了一個環形緩衝區(類似於其他的緩衝設計,比如訊息佇列)MapOutputBuffer
來平滑輸出,它繼承了MapOutputCollector
。在每次緩衝區溢位(spill,其實閾值為80%)後,都會寫入一個臨時檔案。之後,Map 會合並臨時檔案到最終的分割槽檔案。這個過程有大量的配置優化點,比如mapreduce.task.io.sort.mb
(控制緩衝區大小),mapreduce.task.io.sort.factor
(控制進行 merge 的檔案數,另外這個也針對 Reduce),mapreduce.map.output.compress
(進行 map 結果壓縮)等等。
注意,分割槽不代表寫入不同檔案,而是檔案的不同位置。
Spill
spill 過程是由獨立的執行緒執行的,不過為什麼不能和 Reducer 一樣多個執行緒寫呢?主要是這種多寫者問題幾乎必然是有鎖的(無鎖演演算法必須使用其他特定資料結構),效率並不比單寫者高。
在寫入磁碟前,Map 會進行排序(sort),這點比較容易忽略。你可以這樣記住它:MapOutputBuffer
實現了IndexedSortable
(當然,其實本來是 MapReduce 提到了總是排序的好處)。這個介面很有意思,這裡預設實現是 Hadoop 自己實現的快速排序,它會對分割槽先進行排序,而後保持分割槽內有序(這個也可以通過map.sort.class
設定)。至於寫到什麼檔案裡,你可以在getSpillFileForWrite
裡找到(一般都是 out 字尾),比如%s_spill_%d.out
(前面是作業號,後面是溢位的次數)。
在網上流傳著一種說法:Combiner 應該是一個純函式。不過,其實我們知道,Google 在論文裡就提到了 Map 和 Reduce 也應該是純函式(不過也可以在一定範圍內違反)。當然這個傳言是有原因的,因為 Combine 被多次呼叫了:在sortAndSpill
的最後,以及大家都知道的,在整個 Map 階段的最後,它會呼叫mergeParts
方法。
注意Combine 和 Merge 的含義是不同的。Combine 是按照原文教義的,是一種 accelerate;Merge 是處理臨時檔案的。不論是 Map 還是 Reduce 階段都是緩衝區再加檔案合併。那麼能不能直接寫到結果檔案呢?其實理論上可以,但是沒有意義,因為我們每次有新的溢位時,都要和舊的結果檔案進行合併。這樣做也拖慢了 Map 的輸出。
Spill 之後可選的步驟是合併(combine)和壓縮。Map 的壓縮不僅僅是最後的輸出,它在輸出臨時檔案時就會進行了,所以可以極大提高傳輸的效率(但是以 CPU 佔用為代價)。
Spill 生成的所有分割槽都在一個檔案裡。因此,它需要元資訊來標記分割槽的範圍,這就是SpillRecord
,在 Map Task 裡有一個對應的 ArrayList 集合indexCacheList
,它儲存著所有 Map 臨時檔案 的元資訊。不過,如果它的大小超過了mapreduce.task.index.cache.limit.bytes
,那麼就會溢寫到磁碟,所以這裡也是一個可以調優的地方。
Merge 過程可以通過修改mapreduce.task.io.sort.factor
來增加一次合併的數量(多路歸併),否則的話就會增加迴圈次數。不過,這裡略微比我們想的要複雜:整個檔案列表首先會被排序,這之後會取出要歸併的檔案,組成小根堆,然後迭代堆的值合併——當然這也會生成臨時檔案,且臨時檔案會二分搜尋後插入到當前排序列表。最終,所有檔案會合併為一個。但這個程式碼看起來非常容易讓人迷惑:這個堆MergeQueue
繼承了 Hadoop 自己實現的PriorityQueue
類。它本身的泛型引數是K 和 V,而繼承的PriorityQueue
是Segment<K,V>
,也就是待合併的檔案。而這個類又包含了排序的檔案列表segments
。
Hadoop 其實是利用這個堆找全域性最小值,方法是它將 Segment 的最小值(也就是第一個值)作為排序的 key。這樣,就可以實現一個類似於單出隊的多路歸併效果。要注意的是,取出來的 key 之後就要被更新。這一點我們可以從adjustPriorityQueue
裡看到:
private void adjustPriorityQueue(Segment<K,V> reader) throws IOException{
long startPos = reader.getReader().bytesRead;
boolean hasNext = reader.nextRawKey();
long endPos = reader.getReader().bytesRead;
totalBytesProcessed += endPos - startPos;
mergeProgress.set(Math.min(1.0f,totalBytesProcessed * progPerByte));
if (hasNext) {
adjustTop();
} else {
pop();
reader.close();
}
}
複製程式碼
其中nextRawKey
會把 key 設定為下一個 segment 裡鍵的位置(細節可以看一下IFile
的positionToNextRecord
方法)。調整之後就可以順利的繼續進行堆的調整了。
最後,就是在Merger
的writeFile
方法裡,會對RawKeyValueIterator
進行呼叫,這裡是直接傳的this
。不過我看了一下,Merger
本身在Merge
時就會寫檔案,而之後我們在輸出時仍然會寫 Map 的結果檔案,感覺這樣不是重複寫了兩次?雖然說Merger
是 Map 和 Reduce 複用的,但這樣看起來也並不妥。
Partition
Google 的論文也提到,有些場景很適合自定義分割槽器,他舉的例子是 IP 地址。自定義分割槽器很大的意義之一在於防止資料傾斜——雖然說雜湊是最不容易傾斜的了。比如,Hadoop 提供了TotalOrderPartitioner
,它的額外作用是可以有效分割槽。這個分割槽器很有意思,它本來是實現全序排序的,也就是說它會儲存每個分割槽的最大最小值。這樣一來就構成了一個典型的二分搜尋樹結構——不過,這裡考慮得更細一些,即對於可進行逐位元組比較的型別(實現BinaryComparable
)比如Text
,還可以通過 trie 樹搜尋。
還有一種KeyFieldBasedPartitioner
可以基於某些域來進行雜湊。
Shuffle
也可以說是 Copy,從 Map 端拉取(pull)資料為 Reducer 提供輸入。
getProgress().setStatus("reduce");
setPhase(TaskStatus.Phase.SHUFFLE);
複製程式碼
在 Reducer 初始化的時候,會設定當前 Task 狀態為 SHUFFLE。Shuffle 這個名字也不是沒有體現,比如在 Reducer 獲取到 Map 的輸出以後,會對資料的順序進行 Shuffle,防止熱點。
這裡 Shuffle 階段的程式碼都放在了Shuffle
這個類裡,它實現了ShuffleConsumerPlugin
介面。這個名字看起來有點奇怪,因為它是為了實現Shuffle
的服務化(可由第三方服務提供)。這個可以檢視對應的 commit:
這裡 Shuffle 會啟動一個事件接收器EventFetcher
的獨立執行緒來處理 map 完成的事件。這個接收器內部使用了TaskUmbilicalProtocol
協議。我們不必去深究下層 RPC 通訊的邏輯,只需要知道它包含了getMapCompletionEvents
方法。另外,它的實現類TaskAttemptListenerImpl
其實是用來監聽心跳的(實現了TaskAttemptListener
)。
當收到了完成的事件之後,這些訊息會被ShuffleScheduler
解析。具體的實現在addKnownMapOutput
方法中,它會把解析到的mapHost
,也就是持有 map 輸出的節點放到pendingHosts
裡。mapHost
有這樣幾種狀態:
- IDLE:表示 Map 還沒有完成
- PENDING:已經完成等待處理
- BUSY:表示正被拷貝
- PENALIZED:錯誤,拷貝失敗
不過,這個pendingHosts
並沒有暴露出來,Shuffle
是不可見的。排程器ShuffleScheduler
只暴露一個getHost
的介面給Fetcher
執行緒。這個和EventFetcher
一樣,都是獨立的執行緒,而且這裡還可以用mapreduce.reduce.shuffle.parallelcopies
配置來增加並行數。
ShuffleScheduler
的實現ShuffleSchedulerImpl
是使用synchronized
上鎖,諸如addKnownMapOutput
最後會進行notifyAll
,而getHost
會在pendingHosts
不足時等待。Fetcher
的run
程式碼非常簡潔:
while (!stopped && !Thread.currentThread().isInterrupted()) {
MapHost host = null;
try {
// If merge is on,block
merger.waitForResource();
// Get a host to shuffle from
host = scheduler.getHost();
metrics.threadBusy();
// Shuffle
copyFromHost(host);
} finally {
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
}
}
}
複製程式碼
這裡如果有記憶體 merge 正在執行,會阻塞當前拷貝資料,因為後面的copyFromHost
有可能會觸發歸併操作。
和 Map 不一樣的是,這個Merger
的型別是MergeManager
。它是對 Shuffle 階段歸併的抽象,把歸併劃分到了OnDiskMerger
和inMemoryMerger
兩個單獨的執行緒。它提供了三個方法:waitForResource
,reserve
和close
。
其中reserve
會判斷當前是否溢位,並建立一個臨時的拷貝檔案(也可能在記憶體)。這個閾值是根據mapreduce.reduce.memory.totalbytes
和mapreduce.reduce.shuffle.memory.limit.percent
(預設是0.25)的乘積。如果小於閾值,且總記憶體使用沒有溢位,就生成InMemoryMapOutput
,反之生成OnDiskMapOutput
。它們都實現了MapOutput
介面,當整個拷貝完成的時候會呼叫這個介面的commit
方法,將這個輸入加入到finishedMaps
裡。
InMemoryMapOutput
的commit
很特殊,它會判斷是否超過了緩衝區閾值(預設是總記憶體的0.66)或者檔案數是否超過mapreduce.reduce.merge.memtomem.threshold
(預設是ioSortFactor
),如果超過了就會調inMemoryMerger
;同理,這個OnDiskMapOutput
也會根據檔案的數量來進行歸併(預設是2 * ioSortFactor - 1
)。這裡呼叫是基於一個連結串列pendingToBeMerged
的物件鎖,也就是 notify。
Sort
最後在close
的時候,會把記憶體和磁碟的臨時檔案都合併一次(有可能沒到閾值)。當完成了所有的前期工作後,會呼叫finalMerge
方法,這個方法的核心就是Merger.merge
,也就是 Map 裡相同的歸併流程。這樣,就可以形成一個全域性排序的輸出檔案。因為核心過程都在前面提到了,這裡不再贅述。
copyFromHost
是從HttpURLConnection
裡獲取資料流。
Reduce
Reduce 和 Map 一樣,可以定製OutputFormat
的格式,不過它沒有如何分片。整個程式碼都很簡單,reducerContext
會把 Map 的鍵值對合並,其實就是遍歷鍵值對,一直到下一個不同的鍵(更準確的來說,是在根據groupComparator
來確定下一個鍵,所以它有可能會把幾個 key 值放在一起,但預設是RawComparator
,也就是不分組)為止,然後將所有的值合在一起輸出。
Reduce 必須等待 Shuffle 完成才開始執行,因此有可能會導致 Slot Hoarding 問題。
其實整個 MapReduce 的程式碼有非常多值得看一看的地方,比如,Uber 是如何實現的? 和 AM 通訊是如何實現的?但往往時間所限,我們只能觀其大略,然後實際用到了再翻一翻細節。董西成大佬很早就出了兩本關於 Hadoop 原始碼的書,不過可惜只對 MapReduce 1.x 有比較詳細的描述,而 YARN 的篇幅較少,許多細節已經與現在比較主流的 Hadoop 版本不太一樣了,當然還是值得一看的。
To be continued...