Mapper中map方法下context.write的流程與程式碼詳解
本文的分析基於Hadoop 2.4.0版本
任何Map任務在Hadoop中都會被一個MapTask物件所詳細描述,MapTask會最終呼叫其run方法來執行它對應的Map任務,需要執行任務就必須要有相關的輸入輸出資訊,這些資訊都包含在Map任務對應的Context物件中,Context通過RecordReader來獲取輸入資料,Map任務的輸入檔案儲存在InputSplit中,這個InputSplit儲存了檔案的路徑、範圍和位置;通過RecordWriter來儲存處理後的資料,在Context中還有個任務報告器TaskReporter,它不斷向ApplicationMaster報告任務的執行進度
Mapper abstract class Context implements MapContext(介面)
Reducer的abstract class Context implements ReduceContext(介面)
MapContext和ReduceContext都是extends TaskInputOutputContext(介面)
interface TaskInputOutputContext的實現abstractclass TaskInputOutputContextImpl
TaskInputOutputContextImpl的write方法由abstract
Map和Reduce中的Context物件的write方法都是呼叫RecordWriter的write方法
其中RecordWriter有很多的實現類如classMapTask下的privateclass NewOutputCollector、
private class NewDirectOutputCollector
以NewOutputCollector舉例:
public void write(K key, V value)throws IOException, InterruptedException {
collector.collect(key, value,partitioner.getPartition(key,value, partitions));
}
其中collector的型別是MapOutputCollector(介面),最後實現類是static class MapOutputBuffer,是MapTask下的類,方法簽名如下:
public
synchronized void
collect(K key, V value, final
int partition ) throws IOException
在2.x中對1.x進行了改進,1.x中的int [] kvoffsets, int[] kvindices被2.x中的IntBufferkvmeta給替換了
首選會呼叫publicvoid init(MapOutputCollector.Context context 方法進行初始化)
什麼時候呼叫init的呢?
如果是使用runNewMapper的話,當getNumReduceTasks() != 0時,呼叫private <KEY, VALUE> MapOutputCollector<KEY,VALUE>createSortingCollector(JobConfjob, TaskReporter reporter)方法內部呼叫的collector.init(context),其中在NewOutputCollector類的構造方法中呼叫了collector =createSortingCollector(job, reporter),getNumReduceTasks() == 0時,呼叫另一種直接的方式
使用runOldMapper的話,是直接當numReduceTasks > 0時呼叫createSortingCollector(job, reporter),當numReduceTasks =0時,呼叫另一種直接的方式
final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
key和value需要進行序列化的操作
keyClass = (Class<K>)job.getMapOutputKeyClass();
valClass = (Class<V>)job.getMapOutputValueClass();
serializationFactory = new SerializationFactory(job);
keySerializer = serializationFactory.getSerializer(keyClass);
keySerializer.open(bb);
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
初始化操作之後就執行public synchronized void collect(K key,V value, final int partition)方法
然後執行flush方法:將快取中的資料刷到磁碟上
進行spill的時候,先是對快取中的資料進行排序
sorter.sort(MapOutputBuffer.this, mstart, mend,reporter)MapTask的執行主要程式碼:
MapTask的入口是run方法publicvoid run(final JobConf job,final TaskUmbilicalProtocol umbilical)
if (isMapTask()) {
// 如果沒有reducer的任務,map階段會支配所有的程序
if (conf.getNumReduceTasks() == 0) {
mapPhase = getProgress().addPhase("map", 1.0f);
} else {
// 如果有reducer的任務,全部被分為map階段和sort階段,各自佔據一定的處理過程.
mapPhase = getProgress().addPhase("map", 0.667f);
sortPhase =getProgress().addPhase("sort", 0.333f);
}
}
//判斷是否使用新的api
boolean useNewApi = job.getUseNewMapper()
Configuration類中的方法,預設是false
public boolean getUseNewMapper() {
return getBoolean("mapred.mapper.new-api",false);
}
//useNewAPi是關於committer的設定不同,
initialize(job, getJobID(), reporter,useNewApi);
if (useNewApi) {
if (LOG.isDebugEnabled()) {
LOG.debug("using new api for outputcommitter");
}
outputFormat =
ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
committer = outputFormat.getOutputCommitter(taskContext);
} else {
committer = conf.getOutputCommitter();
}
做一些處理操作
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
然後
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
runNewMapper與runOldMapper的不同,對程式碼的不同包裝,實現效果基本相同