1. 程式人生 > 程式設計 >MapReduce 的核心流程

MapReduce 的核心流程

下文中新舊的意思分別代表 Hadoop 0.20 前後。原因是 MapReduce 在這個版本進行了一次大改,主要的特點就是劃分了新舊兩個包名。新版的特點是使用了抽象類代替一些可擴充套件的介面,以及增加了 Context 的概念。比如說,MapContext中就封裝了獲取切片、讀取 Record 等功能,而MapContextImpl就是對 reader 和 writer 邏輯的封裝。

有時候,我們會不小心配置錯誤,比如把mapreduce.job.combine.class配置到了mapred.combiner.class,就可能導致啟用了舊的模組。

org.apache.hadoop.mapred.TaskStatus.Phase

這個列舉中可以看到 task 定義的所有階段:STARTING,MAP,SHUFFLE,SORT,REDUCE,CLEANUP。當然我們在理解的時候可能加入一些自己定義的階段(根據動作),比如 Split。

Split

整個分片在客戶端進行。Split 其實是是邏輯上的分片,但其大小依賴於底層的檔案塊。這個公式搞 Hadoop 的人都很熟悉:

protected long computeSplitSize(long goalSize,long minSize,long blockSize) {
    return Math.max(minSize,Math.min(goalSize,blockSize));
}
複製程式碼

它其實就是保證了minSize \le goalSize \le blockSize。它是FileInputFormat裡專門設定的規則,且這個並不是固定的,而是針對每個檔案(因為 MapReduce 支援傳入路徑,而且是多個路徑)。這裡你就可以看到一個可以調優的點了:顯然這裡需要進行和 namenode 的通訊,這樣就需要允許設定mapreduce.input.fileinputformat.list-status.num-threads增加遍歷檔案資訊時的執行緒數,這很合理(當然,略有一些奇怪的地方是這個配置實際上影響的是客戶端)。

Split的核心類是InputFormat。這個介面必須實現getSplitsgetRecordReader方法,這對應著將輸入的檔案劃分為分片並對映到 Map 的輸入。一般來說,我們預設使用的是TextInputFormat

,它使用了LineRecordReader讀取模式和FileInputFormat的按塊切分。其他的模式也很重要,比如CombineFileInputFormat(可以讀取若干小檔案,這個和 SequenceFile 合在一起不同,只是打包了元資料),比如KeyValueTextInputFormat可以讀取鍵值對,比如NLineInputFormat允許指定切分的行數(其實我覺得應該實現在TextInputFormat裡)。LineRecordReader還有一個細節是它實現了對壓縮檔案的自解釋。

這裡還是有一些值得細想的地方。比如,當獲取了檔案的分塊資訊之後,如何對任務的劃分進行優化(本地化)?如果是本地檔案,如何寫入 HDFS 並補全元資訊?

顯然,本地化已經涉及到作業排程,所以這裡的邏輯應該在 AM 處理。因此這裡JobSubmitter會將資料進行序列化(具體可以參考writeJobSplitMetaInfo),之後在JobImpl.InitTransitioncreateSplits方法可以看到它讀取了一組TaskSplitMetaInfo資訊。JobImpl中只會建立 Map 和 Reduce 任務,具體的排程就要參考排程器的實現了。

listStatus呼叫getFileSystem後,會根據 schema 得到對應的FileSystem實現類,本地就是LocalFileSystem。這裡並沒有做特殊的處理,而是將這個位置也作為元資訊傳遞了。

Map

Mapper裡的MapContext會從InputFormat裡取 RecordReader 作為記錄讀取的方法。這裡的實現比較簡單而且不太重要,重點是理解 Mapper 的run方法:

public void run(Context context) throws IOException,InterruptedException {
    setup(context);
    try {
    while (context.nextKeyValue()) {
        map(context.getCurrentKey(),context.getCurrentValue(),context);
    }
    } finally {
    cleanup(context);
    }
}
複製程式碼

其中setupcleanup只呼叫一次,都是平時 MR 常見的回撥點。

我們知道在 Map 中會將結果分割槽寫入不同檔案。這裡提供了一個環形緩衝區(類似於其他的緩衝設計,比如訊息佇列)MapOutputBuffer來平滑輸出,它繼承了MapOutputCollector。在每次緩衝區溢位(spill,其實閾值為80%)後,都會寫入一個臨時檔案。之後,Map 會合並臨時檔案到最終的分割槽檔案。這個過程有大量的配置優化點,比如mapreduce.task.io.sort.mb(控制緩衝區大小),mapreduce.task.io.sort.factor(控制進行 merge 的檔案數,另外這個也針對 Reduce),mapreduce.map.output.compress(進行 map 結果壓縮)等等。

注意,分割槽不代表寫入不同檔案,而是檔案的不同位置

Spill

spill 過程是由獨立的執行緒執行的,不過為什麼不能和 Reducer 一樣多個執行緒寫呢?主要是這種多寫者問題幾乎必然是有鎖的(無鎖演演算法必須使用其他特定資料結構),效率並不比單寫者高。

在寫入磁碟前,Map 會進行排序(sort),這點比較容易忽略。你可以這樣記住它:MapOutputBuffer實現了IndexedSortable(當然,其實本來是 MapReduce 提到了總是排序的好處)。這個介面很有意思,這裡預設實現是 Hadoop 自己實現的快速排序,它會對分割槽先進行排序,而後保持分割槽內有序(這個也可以通過map.sort.class設定)。至於寫到什麼檔案裡,你可以在getSpillFileForWrite裡找到(一般都是 out 字尾),比如%s_spill_%d.out(前面是作業號,後面是溢位的次數)。

在網上流傳著一種說法:Combiner 應該是一個純函式。不過,其實我們知道,Google 在論文裡就提到了 Map 和 Reduce 也應該是純函式(不過也可以在一定範圍內違反)。當然這個傳言是有原因的,因為 Combine 被多次呼叫了:在sortAndSpill的最後,以及大家都知道的,在整個 Map 階段的最後,它會呼叫mergeParts方法。

注意Combine 和 Merge 的含義是不同的。Combine 是按照原文教義的,是一種 accelerate;Merge 是處理臨時檔案的。不論是 Map 還是 Reduce 階段都是緩衝區再加檔案合併。那麼能不能直接寫到結果檔案呢?其實理論上可以,但是沒有意義,因為我們每次有新的溢位時,都要和舊的結果檔案進行合併。這樣做也拖慢了 Map 的輸出。

Spill 之後可選的步驟是合併(combine)和壓縮。Map 的壓縮不僅僅是最後的輸出,它在輸出臨時檔案時就會進行了,所以可以極大提高傳輸的效率(但是以 CPU 佔用為代價)。

Spill 生成的所有分割槽都在一個檔案裡。因此,它需要元資訊來標記分割槽的範圍,這就是SpillRecord,在 Map Task 裡有一個對應的 ArrayList 集合indexCacheList,它儲存著所有 Map 臨時檔案 的元資訊。不過,如果它的大小超過了mapreduce.task.index.cache.limit.bytes,那麼就會溢寫到磁碟,所以這裡也是一個可以調優的地方。

Merge 過程可以通過修改mapreduce.task.io.sort.factor來增加一次合併的數量(多路歸併),否則的話就會增加迴圈次數。不過,這裡略微比我們想的要複雜:整個檔案列表首先會被排序,這之後會取出要歸併的檔案,組成小根堆,然後迭代堆的值合併——當然這也會生成臨時檔案,且臨時檔案會二分搜尋後插入到當前排序列表。最終,所有檔案會合併為一個。但這個程式碼看起來非常容易讓人迷惑:這個堆MergeQueue繼承了 Hadoop 自己實現的PriorityQueue類。它本身的泛型引數是K 和 V,而繼承的PriorityQueueSegment<K,V>,也就是待合併的檔案。而這個類又包含了排序的檔案列表segments

Hadoop 其實是利用這個堆找全域性最小值,方法是它將 Segment 的最小值(也就是第一個值)作為排序的 key。這樣,就可以實現一個類似於單出隊的多路歸併效果。要注意的是,取出來的 key 之後就要被更新。這一點我們可以從adjustPriorityQueue裡看到:

private void adjustPriorityQueue(Segment<K,V> reader) throws IOException{
    long startPos = reader.getReader().bytesRead;
    boolean hasNext = reader.nextRawKey();
    long endPos = reader.getReader().bytesRead;
    totalBytesProcessed += endPos - startPos;
    mergeProgress.set(Math.min(1.0f,totalBytesProcessed * progPerByte));
    if (hasNext) {
    adjustTop();
    } else {
    pop();
    reader.close();
    }
}
複製程式碼

其中nextRawKey會把 key 設定為下一個 segment 裡鍵的位置(細節可以看一下IFilepositionToNextRecord方法)。調整之後就可以順利的繼續進行堆的調整了。

最後,就是在MergerwriteFile方法裡,會對RawKeyValueIterator進行呼叫,這裡是直接傳的this。不過我看了一下,Merger本身在Merge時就會寫檔案,而之後我們在輸出時仍然會寫 Map 的結果檔案,感覺這樣不是重複寫了兩次?雖然說Merger是 Map 和 Reduce 複用的,但這樣看起來也並不妥。

Partition

Google 的論文也提到,有些場景很適合自定義分割槽器,他舉的例子是 IP 地址。自定義分割槽器很大的意義之一在於防止資料傾斜——雖然說雜湊是最不容易傾斜的了。比如,Hadoop 提供了TotalOrderPartitioner,它的額外作用是可以有效分割槽。這個分割槽器很有意思,它本來是實現全序排序的,也就是說它會儲存每個分割槽的最大最小值。這樣一來就構成了一個典型的二分搜尋樹結構——不過,這裡考慮得更細一些,即對於可進行逐位元組比較的型別(實現BinaryComparable)比如Text,還可以通過 trie 樹搜尋。

還有一種KeyFieldBasedPartitioner可以基於某些域來進行雜湊。

Shuffle

也可以說是 Copy,從 Map 端拉取(pull)資料為 Reducer 提供輸入。

getProgress().setStatus("reduce"); 
setPhase(TaskStatus.Phase.SHUFFLE);  
複製程式碼

在 Reducer 初始化的時候,會設定當前 Task 狀態為 SHUFFLE。Shuffle 這個名字也不是沒有體現,比如在 Reducer 獲取到 Map 的輸出以後,會對資料的順序進行 Shuffle,防止熱點。

這裡 Shuffle 階段的程式碼都放在了Shuffle這個類裡,它實現了ShuffleConsumerPlugin介面。這個名字看起來有點奇怪,因為它是為了實現Shuffle的服務化(可由第三方服務提供)。這個可以檢視對應的 commit:

svn.apache.org/viewvc?view…

這裡 Shuffle 會啟動一個事件接收器EventFetcher的獨立執行緒來處理 map 完成的事件。這個接收器內部使用了TaskUmbilicalProtocol 協議。我們不必去深究下層 RPC 通訊的邏輯,只需要知道它包含了getMapCompletionEvents方法。另外,它的實現類TaskAttemptListenerImpl其實是用來監聽心跳的(實現了TaskAttemptListener)。

當收到了完成的事件之後,這些訊息會被ShuffleScheduler解析。具體的實現在addKnownMapOutput方法中,它會把解析到的mapHost,也就是持有 map 輸出的節點放到pendingHosts裡。mapHost有這樣幾種狀態:

  • IDLE:表示 Map 還沒有完成
  • PENDING:已經完成等待處理
  • BUSY:表示正被拷貝
  • PENALIZED:錯誤,拷貝失敗

不過,這個pendingHosts並沒有暴露出來,Shuffle是不可見的。排程器ShuffleScheduler只暴露一個getHost的介面給Fetcher執行緒。這個和EventFetcher一樣,都是獨立的執行緒,而且這裡還可以用mapreduce.reduce.shuffle.parallelcopies配置來增加並行數。

ShuffleScheduler的實現ShuffleSchedulerImpl是使用synchronized上鎖,諸如addKnownMapOutput最後會進行notifyAll,而getHost會在pendingHosts不足時等待。Fetcherrun程式碼非常簡潔:

while (!stopped && !Thread.currentThread().isInterrupted()) {
  MapHost host = null;
  try {
    // If merge is on,block
    merger.waitForResource();

    // Get a host to shuffle from
    host = scheduler.getHost();
    metrics.threadBusy();

    // Shuffle
    copyFromHost(host);
  } finally {
    if (host != null) {
    scheduler.freeHost(host);
    metrics.threadFree();            
    }
  }
}
複製程式碼

這裡如果有記憶體 merge 正在執行,會阻塞當前拷貝資料,因為後面的copyFromHost有可能會觸發歸併操作。

和 Map 不一樣的是,這個Merger的型別是MergeManager。它是對 Shuffle 階段歸併的抽象,把歸併劃分到了OnDiskMergerinMemoryMerger兩個單獨的執行緒。它提供了三個方法:waitForResourcereserveclose

其中reserve會判斷當前是否溢位,並建立一個臨時的拷貝檔案(也可能在記憶體)。這個閾值是根據mapreduce.reduce.memory.totalbytesmapreduce.reduce.shuffle.memory.limit.percent(預設是0.25)的乘積。如果小於閾值,且總記憶體使用沒有溢位,就生成InMemoryMapOutput,反之生成OnDiskMapOutput。它們都實現了MapOutput介面,當整個拷貝完成的時候會呼叫這個介面的commit方法,將這個輸入加入到finishedMaps裡。

InMemoryMapOutputcommit很特殊,它會判斷是否超過了緩衝區閾值(預設是總記憶體的0.66)或者檔案數是否超過mapreduce.reduce.merge.memtomem.threshold(預設是ioSortFactor),如果超過了就會調inMemoryMerger;同理,這個OnDiskMapOutput也會根據檔案的數量來進行歸併(預設是2 * ioSortFactor - 1)。這裡呼叫是基於一個連結串列pendingToBeMerged的物件鎖,也就是 notify。

Sort

最後在close的時候,會把記憶體和磁碟的臨時檔案都合併一次(有可能沒到閾值)。當完成了所有的前期工作後,會呼叫finalMerge方法,這個方法的核心就是Merger.merge,也就是 Map 裡相同的歸併流程。這樣,就可以形成一個全域性排序的輸出檔案。因為核心過程都在前面提到了,這裡不再贅述。

copyFromHost是從HttpURLConnection裡獲取資料流。

Reduce

Reduce 和 Map 一樣,可以定製OutputFormat的格式,不過它沒有如何分片。整個程式碼都很簡單,reducerContext會把 Map 的鍵值對合並,其實就是遍歷鍵值對,一直到下一個不同的鍵(更準確的來說,是在根據groupComparator來確定下一個鍵,所以它有可能會把幾個 key 值放在一起,但預設是RawComparator,也就是不分組)為止,然後將所有的值合在一起輸出。

Reduce 必須等待 Shuffle 完成才開始執行,因此有可能會導致 Slot Hoarding 問題。


其實整個 MapReduce 的程式碼有非常多值得看一看的地方,比如,Uber 是如何實現的? 和 AM 通訊是如何實現的?但往往時間所限,我們只能觀其大略,然後實際用到了再翻一翻細節。董西成大佬很早就出了兩本關於 Hadoop 原始碼的書,不過可惜只對 MapReduce 1.x 有比較詳細的描述,而 YARN 的篇幅較少,許多細節已經與現在比較主流的 Hadoop 版本不太一樣了,當然還是值得一看的。

To be continued...