MapReducer中原始碼Mapper和Reducer方法原始碼解析
阿新 • • 發佈:2018-11-10
原始碼中Mapper類中的方法
/**
* The <code>Context</code> passed on to the {@link Mapper} implementations.
*/
public abstract class Context
implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
上下文map結束後,向reduce或者下一個階段寫資料時候
/** * Called once at the beginning of the task. */ protected void setup(Context context ) throws IOException, InterruptedException { // NOTHING }
任務開始的時候被呼叫一次
/** * Called once for each key/value pair in the input split. Most applications * 對於輸入分割中的每個鍵/值對呼叫一次。所有的應用程式 * should override this, but the default is the identity function. * 應該重寫這個,但預設是identity函式 * 這裡的key和value是輸入的 */ @SuppressWarnings("unchecked") protected void map(KEYIN key, VALUEIN value, Context context) throws IOException, InterruptedException { //輸出的key-value context是上下文,屬於管理者 context.write((KEYOUT) key, (VALUEOUT) value); }
處理整個map階段的核心業務
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
任務結束的時候
/** * Expert users can override this method for more complete control over the * 專家使用者可以重寫此方法以更完整地控制執行的mapper * execution of the Mapper. * @param context * @throws IOException */ public void run(Context context) throws IOException, InterruptedException { //初始化資料(初始化集合,載入表等) setup(context); try { while (context.nextKeyValue()) { //核心業務邏輯 map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { //最終結束:流的關閉,資源的處理 cleanup(context); } } }
具體的執行map方法的順序
Reducer類
/**
* The <code>Context</code> passed on to the {@link Reducer} implementations.
*/
public abstract class Context
implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
負責寫出資料的
/**
* Called once at the start of the task.
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
開始的時候呼叫,初始化操作
/**
* This method is called once for each key(這個方法被所有key使用). Most applications will define
* their reduce class by overriding this method(所有的應用都會重寫這個方法). The default implementation(預設是identity函式)
* is an identity function.
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
具體的Reducer業務邏輯
/**
* Called once at the end of the task.
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
收尾的一些關閉流的操作
/**
* Advanced application writers can use the 高階應用程式編寫者可以使用
* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
* control how the reduce task works.控制整個reduce task工作
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
// If a back up store is used, reset it
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
將所有方法串在一起