1. 程式人生 > >MapReduce系列之Mapper、Combiner和Reducer

MapReduce系列之Mapper、Combiner和Reducer

  • Mapper類

MapReduce的Mapper類中共有四個父類的方法:setup()、map()、run()、cleanup()

setup()方法:在一個任務中,此方法只在開始執行一次,適用於對相關變數和資源的初始化操作。

map()方法:任務實現的主要過程在此方法中執行,只要FileInputFormat解析出KV對,則此方法便執行一次,重複執行直到Split檔案處理完成。使用者可以重寫這個方法,但是預設的時候通常會呼叫setup而啟動這個程式。這個函式預設並不做什麼有用的 事情,但是可以被使用者覆蓋重寫以便於設定任務(例如初始化類的變數),當設定完成之後,分片的每一個鍵值對會激發map()方法。因此map()接收到一個鍵,一個值,以及一個上下文context。使用這個上下文物件,一個map就會儲存其輸出到快取中。

請注意,map分片是按塊擷取的(例如64kb),每一個塊分割成為若干鍵值對的資料( SplitLineReader.class乾的好事),這是在Mapper.Context.nextKeyValue內部完成的。當map分片被全部處理之後,run()會呼叫clean()方法。預設的,沒有什麼會被執行,除非使用者重寫覆蓋他。

map會使用Mapper.Context.write()將map函式的輸出溢寫到記憶體中的環形緩衝區 (MapTask.MapOutputBuffer)。緩衝區的大小是固定的,通過mapreduce.task.io.sort.mb (default: 100MB)指定。


任何時候當這個緩衝區將要充滿的時候(mapreduce.map. sort.spill.percent: 預設80% ),溢寫將會被執行(這是一個並行過程,使用的是單獨的執行緒,緩衝池還可以繼續被寫入)。如果溢寫執行緒太慢,而緩衝區又忙了的話,map()就會暫停執行而等待。
溢寫執行緒執行下面的動作:
1、建立一個溢寫記錄SpillRecord 和一個FSOutputStream 檔案輸出流(本地檔案系統)
2、記憶體內排序緩衝中的塊:輸出的資料會使用快排演算法按照partitionIdx, key排序
3、排序之後的輸出會分割成為分割槽:每一個分割槽對應一個reduce
4、分割槽序列化寫到本地檔案

run()方法:提供了setup->map->cleanup執行的模版,一般此方法不重寫,

cleanup()方法:此方法在一個任務中僅執行一次,實現變數和資源的釋放工作。

   例子很多,使用者必須要自己實現這個類。在驅動中指定定義的Mapper實現類:

job.setMapperClass(**Mapper.class);

 

  • Combiner類

       Mapper端的Reducer。但不能改變Mapper端的Key和Value的輸出型別。

  • Reducer類

       和Mapper類似,但主要功能不同,主要是對Mapper的輸出進行歸約。