大數據:Mapper輸出緩沖區MapOutputBuffer
現在我們知道了Map的輸入端,緊接著我們看map的輸出,這裏重點就是context.write這個語句的內涵。獲取視頻中文檔資料及完整視頻的夥伴請加QQ群:947967114
搞清Mapper作為參數傳給map的context,這裏我們看Mapper的run被調用的時候作為了參數傳遞下來。調用Mapper.run的是MapTask. runNewMapper。到這裏我們深究一下runNewMapper。我們看MapTask的run方法:我們重點看runNewMapper
public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; if (isMapTask()) { // If there are no reducers then there won‘t be any sort. Hence the map // phase will govern the entire attempt‘s progress. if (conf.getNumReduceTasks() == 0) { mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there are reducers then the entire attempt‘s progress will be // split between the map phase (67%) and the sort phase (33%). mapPhase = getProgress().addPhase("map", 0.667f); sortPhase = getProgress().addPhase("sort", 0.333f); } } TaskReporter reporter = startReporter(umbilical);獲取視頻中文檔資料及完整視頻的夥伴請加QQ群:947967114 boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi); // check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; } if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter);
}
當我們點runNewMapper的時候就能進入真正實現:
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewMapper(final JobConf job,final TaskSplitIndex splitIndex,final TaskUmbilicalProtocol umbilical,TaskReporter reporter) throws IOException, ClassNotFoundException,
InterruptedException {
// make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(),reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
//確定該用哪一種具體的Mapper,然後創建。獲取視頻中文檔資料及完整視頻的夥伴請加QQ群:947967114
org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
(org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
//確定輸入的文件格式
// rebuild the input split
org.apache.hadoop.mapreduce.InputSplit split = null;
split = getSplitDetails(new Path(splitIndex.getSplitLocation()),splitIndex.getStartOffset());//確定這個Mapper所用的輸入是哪一個split
LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
new NewTrackingRecordReader<INKEY,INVALUE>
(split, inputFormat, reporter, taskContext);
//創建和InputFormat相稱的RecordReader
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object
if (job.getNumReduceTasks() == 0) {
//如果設置的reduce個數是0,就直接輸出。
output =
new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
} else {
output = new NewOutputCollector(taskContext, job, umbilical, reporter);
}
接下來我們看一下NewOutputCollector源碼 獲取視頻中文檔資料及完整視頻的夥伴請加QQ群:947967114
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf job,
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
collector = createSortingCollector(job, reporter);
//創建通向排序階段的collecter
partitions = jobContext.getNumReduceTasks();
//通過獲取Reduce數量來獲得partitions數量。兩個數量一一對應
if (partitions > 1) {
//獲取的partitions 數量大於1
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
//ReflectionUtils.newInstance創建用戶設置的Partitioner,裏邊的參數jobContext.getPartitionerClass()是對抽象類的某種擴充,表示自己可以書寫一個Partitioner類,通過這個方法來獲取,如果沒有自己寫,就是用默認的HashPartitioner
} else {
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}//只有一個partition就動態擴充抽象類Partitioner類
};
}
}
回到runNewMapper源碼:
org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
mapContext =
new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), input, output, committer, reporter, split);
//創建一個用於Mapper的Context。
org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
mapperContext =
new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(mapContext);
//把上邊創建的mapContext通過getMapContext獲取過來最終傳遞給mapperContext ,我們繼續看getMapContext源碼
public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context
getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
return new Context(mapContext);
}
//這裏返回了Context對象,在查看Context對象。獲取視頻中文檔資料及完整視頻的夥伴請加QQ群:947967114
public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
this.mapContext = mapContext;
}
//我們看到獲取了mapContext 的值。所以我們知道WrappedMapper-->Context-->mapContext是一個MapContextImpl。
try {
input.initialize(split, mapperContext);
//初始化input,input是recordReader對象,split和mapperContext作為參數
mapper.run(mapperContext);
//我們知道這個run方法運行的是Mapper的run方法,所以看一下這個run
public void run(Context context) throws IOException, InterruptedException {
setup(context);
//獲取context
try {
while (context.nextKeyValue()) {
//通過nextKeyValue來控制運行
map(context.getCurrentKey(), context.getCurrentValue(), context);
//運行了map方法,給了recordReader提供過來的鍵值對。
}
} finally {
cleanup(context);
}
}
回到MapTask源碼
mapPhase.complete();
//上鎖
setPhase(TaskStatus.Phase.SORT);
//所有的task結果進行排序
statusUpdate(umbilical);
//更新runNewMapper狀態。
input.close();
//關閉輸入流
input = null;
output.close(mapperContext);
//關閉輸出流
output = null;
} finally {
closeQuietly(input);
closeQuietly(output, mapperContext);
}
}
對於輸入格式和分片以前已經詳細說過了,需要註意NewTrackingRecordReader。我們知道有了InputFormat之後需要創建與他對應的RecordReader。但是在RecordReader上是用NewTrackingRecordReader。不同之處在於Tracking,是一個跟蹤,對RecordReader的跟蹤,他這裏有一個參數reporter,就是用來上報跟蹤結果的,RecordReader則沒有這個功能。
和輸出有關的是collecter,是輸出數據的收集器,context.write最後就通過RecodWriter落實到collector.collect上。RecordWriter和RecordReader是同一個層次。RecodWriter是hadoop定義個一個抽象類,具體的RecodWriter就是對這個抽象類的擴充。用於maptask的就是NewDrictDoutputCollecter和NewOutputCollecter。
這兩個類叫做OutputCollecter,實際上都是RecordWriter。Collecter只是一種語意的描述。從Mapper的角度看是Writer,是輸出。從框架或下遊的角度看是Collect,是收集。
如果reducer數量是0,就是沒有reducer,Mapper的輸出就是整個MR的輸出,這個時候用RecordWriter的NewDrictDoutputCollecter,直接輸出。相反至少有一個Reducer,那麽使用的就是RecordWriter的NewOutputCollecter。這是我們註重的重點內容。我們看NewOutputCollecter源碼。定義了幾個內容:
collector = createSortingCollector(job, reporter);
//實現MapOutputCollector
partitions = jobContext.getNumReduceTasks();
//負責Mapper輸出的分區
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
//分發目標的個數,也就是Reducer的個數。
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
//write只寫不讀。
@Override
public void close(TaskAttemptContext context
) throws IOException,InterruptedException {
try {
collector.flush();
} catch (ClassNotFoundException cnf) {
throw new IOException("can‘t find class ", cnf);
}
collector.close();
}
}
NewOutputCollector分成兩部分,一個是collecter還有一個是partitioner。collecter負責實際收集Mapper輸出並交付給Reducer的工作,partitioner負責決定把具體的輸出交給哪一個Reducer。
有多個Reducer存在,MR框架需要把每個Mapper的每項輸出,也就是收集到的所有的KV對。按照某種條件(就是Partioner的實現方式,默認就是HashPartitioner)輸出到不同的Reducer。這樣就把Mapper的輸出劃分成了多個分區(Partition),有幾個Reducer就把每個Mapper還分成幾個Partition,Partitioner就是起到劃分的作用。hash的方式。。。。。。。。。。。。
所以在創建NewOutputCollector的構造函數中,就要把具體的collector和partitioner創建好。
hadoop的源碼中定義了MapOutputCollector。凡是實現了這個類,除了init和close方法外,還必須提供collect和flush這兩個函數,從NewOutputCollector知道這兩個函數的調用者是collector,創建collector的方式是通過createSortingCollector來完成的。並且還實現了對KV對的排序。從屬關系如下:
YarnChild.main->PrivilegeExceptionAction.run->Maptask.run-->RunNewMapper->NewOutputCollector->MapTask.createSortingCollector
那麽我們來看一下createSortingCollector源碼。獲取視頻中文檔資料及完整視頻的夥伴請加QQ群:947967114
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
Class<?>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
//如果沒有添加設置就默認使用MapOutputBuffer.class
int remainingCollectors = collectorClasses.length;
for (Class clazz : collectorClasses) {
//逐一實驗設置的collectorClasses
try {
if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
throw new IOException("Invalid output collector class: " + clazz.getName() +
" (does not implement MapOutputCollector)");
//這裏告訴我們必須實現MapOutputCollector.class
}
Class<? extends MapOutputCollector> subclazz =
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
//獲取日誌
MapOutputCollector<KEY, VALUE> collector =
ReflectionUtils.newInstance(subclazz, job);
//創建collector對象。
collector.init(context);
//初始化collector,實際上初始化的是MapOutputBuffer對象
LOG.info("Map output collector class = " + collector.getClass().getName());
return collector;
//沒有異常就成功了。
} catch (Exception e) {
String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
if (--remainingCollectors > 0) {
msg += " (" + remainingCollectors + " more collector(s) to try)";
}
LOG.warn(msg, e);
}
}
throw new IOException("Unable to initialize any output collector");
}
具體采用什麽collector是可以在配置文件mapred-default.xml中設置的,這裏的MAP_OUTPUT_COLLECTOR_CLASS_ATTR即mapreduce.job.output.collector.class.如果文件中沒有設置就使用默認的MapOutputBuffer。所以實際創建的collcter就是Mapask的MapOutputBuffer。這個類是Maptask的內部類,實現了MapOutputCollector。
可想而知,如果我們另寫一個實現了MapOutputCollectior的Collector,並修改配置文件mapred-default.xml中隊配置項的設置。那麽就可以創建不是MapTask.MapOutputBuffer。那樣createSortingCollector創建的就是一個沒有排序功能的collector。我們知道MapReduce框架之所以是工作流不是數據流的原因就是因為Mapper和Reducer之間的排序。因為Sort只有在所有數據到來之後才能完成。sort完之後所有數據才被Rducer拉取。那麽沒有了sort之後代表數據可以不斷的流入而不是一次性的填充,MR給我們提供了這種可能性,就是通過寫一個不排序的Collector來替代MapOutputBuffer。我們接下來還是把註意力放到runNewMapper上。
當創建了collector和partitioner之後就是Context,MapTask在調用mapper.run時作為參數的是mapperContext,這個對象的類型是WrappedMapper.Context,整個過程是MapContextImpl創建了mapContext對象,通過WrappedMapper對象(是對Mapper的擴充,根據名字就可以知道是對Mapper的包裝區別就是在內部定義了Context類),把一個擴充的Mapper.Context包裝在Mapper內部,這就是WrappedMapper.Context類對象。下面是部分代碼;
public class WrappedMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
public Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context
getMapContext(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
return new Context(mapContext);
}
@InterfaceStability.Evolving
public class Context
extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context {
protected MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext;
//MapContext類。被MapContextImpl實現
public Context(MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> mapContext) {
this.mapContext = mapContext;
}
/**
* Get the input split for this map.
*/
public InputSplit getInputSplit() {
return mapContext.getInputSplit();
}
@Override
public KEYIN getCurrentKey() throws IOException, InterruptedException {
return mapContext.getCurrentKey();
}
@Override
public VALUEIN getCurrentValue() throws IOException, InterruptedException {
return mapContext.getCurrentValue();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
return mapContext.nextKeyValue();
}
WrappedMapper.Context是對Mapper.Context的擴充。內部mapContext,它的構造函數Context中的this.mapContext就設置成這個MapContextImpl類對象mapContext。WrappedMapper.Context擴充了Mapper.Context的write、getCurrentKey、nextKeyValue等。
傳給mapper.run的context就是WrappedMapper.Context對象。裏面的mapContext是MapContextImpl對象。
我們繼續看Mapper.map的context.write
關系是:MapTask.run->runNewMapper->Mapper.run->Mapper.map
按照這個關系找到了一個沒有做任何事的方法。
public void write(KEYOUT key, VALUEOUT value)
throws IOException, InterruptedException;
我們需要找一個實現,這裏找到的就是WrappedMapper.Context.write
就是這一段:
public void write(KEYOUT key, VALUEOUT value) throws IOException,
InterruptedException {
mapContext.write(key, value);
}
這裏的調用的其實是MapContextImpl.write。所以我們找到MapContextImpl。當我們看到MapContextImpl源碼是看到繼承了TaskInputOutputContextImpl我們找到了
public void write(KEYOUT key, VALUEOUT value
) throws IOException, InterruptedException {
output.write(key, value);
}
找到這裏我們還是沒有找到真正的實現,這裏的witer實際上調用的是,NewOutputCollector.writer。
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
繞了一大圈之後我們發現最終回到了NewOutputCollector,這裏的write和之前的有明顯區別是collect實現的,裏面有了分區。我們找的目的是一定要找到write中真正實現了分區寫。
我們知道context是個WrappedMappe.Context對象,所以context.write其實就是就是Wrapped.Context.write,這個函數轉而調用內部成分mapContext的write函數,而mapContext是個MapContextImpl對象,所以實際調用的是MoapCntextImpl.write。然而MapContextImpl中沒有提供write函數,但是我們看到這個類繼承了TaskInputOutputContextImpl。所以就繼承他的write方法,然後這個write函數調用的是output的write,我們知道這個output參數類型是一個RecordReader,實際上這個output就是MapTask中定義的output,這個output是一個NewOutputCollector,也就是說是調用的NewOutputCollector的write方法,在這個write中我們看到調用了collector的collect,這個collecter就是Maptask.MapOutputBuffer。
在調用Maptask.MapOutputBuffer的collect時增加了一個參數partition,是指明KV去向的,這個值是有job.setPartitionerClass指定的,沒有設置就使用了hashPartitioner。下面所有的工作就是由MapTask的MapOutputBuffer來完成了。獲取視頻中文檔資料及完整視頻的夥伴請加QQ群:947967114
大數據:Mapper輸出緩沖區MapOutputBuffer