MapTask階段shuffle原始碼分析
1. 收集階段
在Mapper中,呼叫context.write(key,value)實際是呼叫代理NewOutPutCollector的wirte方法
public void write(KEYOUT key, VALUEOUT value
) throws IOException, InterruptedException {
output
}
實際呼叫的是MapOutPutBuffer的collect(),在進行收集前,呼叫partitioner來計算每個key-value的分割槽號
@Override
public void write(K key, V value) throws IOException, InterruptedException {
collector.collect(key, value,
partitioner.getPartition(key, value, partitions));
}
2. NewOutPutCollector物件的建立
@SuppressWarnings("unchecked")
NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
JobConf
TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, ClassNotFoundException {
// 建立實際用來收集key-value的快取區物件
collector = createSortingCollector(job, reporter);
// 獲取總的分割槽個數
partitions = jobContext.getNumReduceTasks();
if (partitions > 1) {
partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
} else {
// 預設情況,直接建立一個匿名內部類,所有的key-value都分配到0號分割槽
partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
@Override
public int getPartition(K key, V value, int numPartitions) {
return partitions - 1;
}
};
}
}
3. 建立環形緩衝區物件
@SuppressWarnings("unchecked")
private <KEY, VALUE> MapOutputCollector<KEY, VALUE>
createSortingCollector(JobConf job, TaskReporter reporter)
throws IOException, ClassNotFoundException {
MapOutputCollector.Context context =
new MapOutputCollector.Context(this, job, reporter);
// 從當前Job的配置中,獲取mapreduce.job.map.output.collector.class,如果沒有設定,使用MapOutputBuffer.class
Class<?>[] collectorClasses = job.getClasses(
JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);
int remainingCollectors = collectorClasses.length;
Exception lastException = null;
for (Class clazz : collectorClasses) {
try {
if (!MapOutputCollector.class.isAssignableFrom(clazz)) {
throw new IOException("Invalid output collector class: " + clazz.getName() +
" (does not implement MapOutputCollector)");
}
Class<? extends MapOutputCollector> subclazz =
clazz.asSubclass(MapOutputCollector.class);
LOG.debug("Trying map output collector class: " + subclazz.getName());
// 建立緩衝區物件
MapOutputCollector<KEY, VALUE> collector =
ReflectionUtils.newInstance(subclazz, job);
// 建立完緩衝區物件後,執行初始化
collector.init(context);
LOG.info("Map output collector class = " + collector.getClass().getName());
return collector;
} catch (Exception e) {
String msg = "Unable to initialize MapOutputCollector " + clazz.getName();
if (--remainingCollectors > 0) {
msg += " (" + remainingCollectors + " more collector(s) to try)";
}
lastException = e;
LOG.warn(msg, e);
}
}
throw new IOException("Initialization of all the collectors failed. " +
"Error in last collector was :" + lastException.getMessage(), lastException);
}
3. MapOutPutBuffer的初始化 環形緩衝區物件
@SuppressWarnings("unchecked") public void init(MapOutputCollector.Context context ) throws IOException, ClassNotFoundException { job = context.getJobConf(); reporter = context.getReporter(); mapTask = context.getMapTask(); mapOutputFile = mapTask.getMapOutputFile(); sortPhase = mapTask.getSortPhase(); spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS); // 獲取分割槽總個數,取決於ReduceTask的數量 partitions = job.getNumReduceTasks(); rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
//sanity checks // 從當前配置中,獲取mapreduce.map.sort.spill.percent,如果沒有設定,就是0.8 final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); // 獲取mapreduce.task.io.sort.mb,如果沒設定,就是100MB final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT, INDEX_CACHE_MEMORY_LIMIT_DEFAULT); if (spillper > (float)1.0 || spillper <= (float)0.0) { throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT + "\": " + spillper); } if ((sortmb & 0x7FF) != sortmb) { throw new IOException( "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb); } // 在溢寫前,對key-value排序,採用的排序器,使用快速排序,只排索引 sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job); // buffers and accounting int maxMemUsage = sortmb << 20; maxMemUsage -= maxMemUsage % METASIZE; // 存放key-value kvbuffer = new byte[maxMemUsage]; bufvoid = kvbuffer.length; // 儲存key-value的屬性資訊,分割槽號,索引等 kvmeta = ByteBuffer.wrap(kvbuffer) .order(ByteOrder.nativeOrder()) .asIntBuffer(); setEquator(0); bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex;
maxRec = kvmeta.capacity() / NMETA; softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit; LOG.info(JobContext.IO_SORT_MB + ": " + sortmb); LOG.info("soft limit at " + softLimit); LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid); LOG.info("kvstart = " + kvstart + "; length = " + maxRec);
// k/v serialization // 獲取快速排序的Key的比較器,排序只按照key進行排序! comparator = job.getOutputKeyComparator();
// 獲取key-value的序列化器 keyClass = (Class<K>)job.getMapOutputKeyClass(); valClass = (Class<V>)job.getMapOutputValueClass(); serializationFactory = new SerializationFactory(job); keySerializer = serializationFactory.getSerializer(keyClass); keySerializer.open(bb); valSerializer = serializationFactory.getSerializer(valClass); valSerializer.open(bb);
// output counters mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES); mapOutputRecordCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS); fileOutputByteCounter = reporter .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
// 溢寫到磁碟,可以使用一個壓縮格式! 獲取指定的壓縮編解碼器 // compression if (job.getCompressMapOutput()) { Class<? extends CompressionCodec> codecClass = job.getMapOutputCompressorClass(DefaultCodec.class); codec = ReflectionUtils.newInstance(codecClass, job); } else { codec = null; }
// 獲取Combiner元件 // combiner final Counters.Counter combineInputCounter = reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS); combinerRunner = CombinerRunner.create(job, getTaskID(), combineInputCounter, reporter, null); if (combinerRunner != null) { final Counters.Counter combineOutputCounter = reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job); } else { combineCollector = null; } spillInProgress = false; minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3); // 設定溢寫執行緒在後臺執行,溢寫是在後臺執行另外一個溢寫執行緒!和收集是兩個執行緒! spillThread.setDaemon(true); spillThread.setName("SpillThread"); spillLock.lock(); try { // 啟動執行緒 spillThread.start(); while (!spillThreadRunning) { spillDone.await(); } } catch (InterruptedException e) { throw new IOException("Spill thread failed to initialize", e); } finally { spillLock.unlock(); } if (sortSpillException != null) { throw new IOException("Spill thread failed to initialize", sortSpillException); } } |
4. Paritionner的獲取
從配置中讀取mapreduce.job.partitioner.class,如果沒有指定,採用HashPartitioner.class
如果reduceTask > 1, 還沒有設定分割槽元件,使用HashPartitioner
@SuppressWarnings("unchecked")
public Class<? extends Partitioner<?,?>> getPartitionerClass()
throws ClassNotFoundException {
return (Class<? extends Partitioner<?,?>>)
conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);
}
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
分割槽號的限制: 0 <= 分割槽號 < 總的分割槽數(reduceTask的個數)
if (partition < 0 || partition >= partitions) {
throw new IOException("Illegal partition for " + key + " (" +
partition + ")");
}
5.MapTask shuffle的流程
①在map()呼叫context.write()
②呼叫MapoutPutBuffer的collect()
呼叫分割槽元件Partitionner計算當前這組key-value的分割槽號
③將當前key-value收集到MapOutPutBuffer中
如果超過溢寫的閥值,在後臺啟動溢寫執行緒,來進行溢寫!
④溢寫前,先根據分割槽號,將相同分割槽號的key-value,採用快速排序演算法,進行排序!
排序並不在記憶體中移動key-value,而是記錄排序後key-value的有序索引!
⑤ 開始溢寫,按照排序後有序的索引,將檔案寫入到一個臨時的溢寫檔案中
如果沒有定義Combiner,直接溢寫!
如果定義了Combiner,使用CombinerRunner.conbine()對key-value處理後再次溢寫!
⑥多次溢寫後,每次溢寫都會產生一個臨時檔案
⑦最後,執行一次flush(),將剩餘的key-value進行溢寫
⑧MergeParts: 將多次溢寫的結果,儲存為一個總的檔案!
在合併為一個總的檔案前,會執行歸併排序,保證合併後的檔案,各個分割槽也是有序的!
如果定義了Conbiner,Conbiner會再次執行(前提是溢寫的檔案個數大於3)!
否則,就直接溢寫!
⑨最終保證生成一個最終的檔案,這個檔案根據總區號,分為若干部分,每個部分的key-value都已經排好序,等待ReduceTask來拷貝相應分割槽的資料
6. Combiner
combiner其實就是Reducer型別:
Class<? extends Reducer<K,V,K,V>> cls =
(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
Combiner的執行時機:
MapTask:
①每次溢寫前,如果指定了Combiner,會執行
②將多個溢寫片段,進行合併為一個最終的檔案時,也會運行Combiner,前提是片段數>=3
ReduceTask:
③reduceTask在執行時,需要啟動shuffle程序拷貝MapTask產生的資料!
資料在copy後,進入shuffle工作的記憶體,在記憶體中進行merge和sort!
資料過多,內部不夠,將部分資料溢寫在磁碟!
如果有溢寫的過程,那麼combiner會再次執行!
①一定會執行,②,③需要條件!