1. 程式人生 > >MapReduce之MapTask工作機制

MapReduce之MapTask工作機制

## 1. 階段定義 `MapTask`:map----->sort `map`:Mapper.map()中將輸出的`key-value`**寫出之前** `sort`:Mapper.map()中將輸出的`key-value`**寫出之後** ## 2. MapTask工作機制 1. **Read階段** MapTask通過使用者編寫的`RecordReader`,從輸入`InputSplit`中解析出一個個`key/value`。 2. **Map階段** 該節點主要是將解析出的key/value交給使用者編寫`map()`函式處理,併產生一系列新的key/value。 3. **Collect收集階段** 在使用者編寫map()函式中,當資料處理完成後,一般會呼叫`OutputCollector.collect()`輸出結果。在該函式內部,它會將生成的key/value分割槽(呼叫`Partitioner`),並寫入一個記憶體緩衝區中,並且會被`Partitioner`計算一個**分割槽號**,按照先後順序分配`index`下標 4. **Spill階段** - 即“溢寫”,在此階段有兩個重要執行緒。==收集執行緒==負責向緩衝區收集資料,緩衝區初始值為100M,當使用到80%閾值,喚醒溢寫執行緒,==溢寫執行緒==會將緩衝區已經收集的資料==溢寫==到磁碟。 - 在溢寫前,會對緩衝區中的資料進行排序(==快速排序==),在排序時,只通過比較key進行排序,只改變index的位置,不交換資料的位置 - 排序後,按照分割槽,依次將資料寫入到磁碟的臨時檔案的若干分割槽中 - 每次溢寫都會生成一個==臨時檔案==,當所有的資料都溢寫完成之後,會將所有的臨時檔案片段==合併==為一個總的檔案 5. **Combine階段** - 在合併時,將所有的臨時檔案的相同分割槽的資料,進行合併,合併後再對所有的資料進行排序(==歸併排序==) - 最終生成一個結果檔案(`output/file.out`),同時生成相應的索引檔案`output/file.out.index`,這個檔案分為若干分割槽,每個分割槽的資料已經按照key進行了排序,等待`reduceTask`的shuffle執行緒來拷貝資料! ## 溢寫階段詳情: - 步驟1:利用快速排序演算法對快取區內的資料進行排序,排序方式是,先按照分割槽編號Partition進行排序,然後按照key進行排序。這樣,經過排序後,資料以分割槽為單位聚集在一起,且同一分割槽內所有資料按照key有序。 - 步驟2:按照分割槽編號由小到大依次將每個分割槽中的資料寫入任務工作目錄下的臨時檔案output/spillN.out(N表示當前溢寫次數)中。如果使用者設定了Combiner,則寫入檔案之前,對每個分割槽中的資料進行一次聚集操作。 - 步驟3:將分割槽資料的元資訊寫到記憶體索引資料結構SpillRecord中,其中每個分割槽的元資訊包括在臨時檔案中的偏移量、壓縮前資料大小和壓縮後資料大小。如果當前記憶體索引大小超過1MB,則將記憶體索引寫到檔案output/spillN.out.inde