1. 程式人生 > >MapTask階段shuffle原始碼分析

MapTask階段shuffle原始碼分析

1. 收集階段

在Mapper中,呼叫context.write(key,value)實際是呼叫代理NewOutPutCollector的wirte方法

public void write(KEYOUT key, VALUEOUT value

                    ) throws IOException, InterruptedException {

    output

.write(key, value);

  }

 

實際呼叫的是MapOutPutBuffer的collect(),在進行收集前,呼叫partitioner來計算每個key-value的分割槽號

@Override

    public void write(K key, V value) throws IOException, InterruptedException {

      collector.collect(key, value,

                        partitioner.getPartition(key, value, partitions));

    }

2. NewOutPutCollector物件的建立

@SuppressWarnings("unchecked")

    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,

                       JobConf

job,

                       TaskUmbilicalProtocol umbilical,

                       TaskReporter reporter

                       ) throws IOException, ClassNotFoundException {

    // 建立實際用來收集key-value的快取區物件

      collector = createSortingCollector(job, reporter);

 

   //  獲取總的分割槽個數

      partitions = jobContext.getNumReduceTasks();

      if (partitions > 1) {

        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)

          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);

      } else {

        // 預設情況,直接建立一個匿名內部類,所有的key-value都分配到0號分割槽

        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {

          @Override

          public int getPartition(K key, V value, int numPartitions) {

            return partitions - 1;

          }

        };

      }

    }

3. 建立環形緩衝區物件

@SuppressWarnings("unchecked")

  private <KEY, VALUE> MapOutputCollector<KEY, VALUE>

          createSortingCollector(JobConf job, TaskReporter reporter)

    throws IOException, ClassNotFoundException {

    MapOutputCollector.Context context =

      new MapOutputCollector.Context(this, job, reporter);

    // 從當前Job的配置中,獲取mapreduce.job.map.output.collector.class,如果沒有設定,使用MapOutputBuffer.class

    Class<?>[] collectorClasses = job.getClasses(

      JobContext.MAP_OUTPUT_COLLECTOR_CLASS_ATTR, MapOutputBuffer.class);

    int remainingCollectors = collectorClasses.length;

    Exception lastException = null;

    for (Class clazz : collectorClasses) {

      try {

        if (!MapOutputCollector.class.isAssignableFrom(clazz)) {

          throw new IOException("Invalid output collector class: " + clazz.getName() +

            " (does not implement MapOutputCollector)");

        }

        Class<? extends MapOutputCollector> subclazz =

          clazz.asSubclass(MapOutputCollector.class);

        LOG.debug("Trying map output collector class: " + subclazz.getName());

      // 建立緩衝區物件

        MapOutputCollector<KEY, VALUE> collector =

          ReflectionUtils.newInstance(subclazz, job);

      // 建立完緩衝區物件後,執行初始化

        collector.init(context);

        LOG.info("Map output collector class = " + collector.getClass().getName());

        return collector;

      } catch (Exception e) {

        String msg = "Unable to initialize MapOutputCollector " + clazz.getName();

        if (--remainingCollectors > 0) {

          msg += " (" + remainingCollectors + " more collector(s) to try)";

        }

        lastException = e;

        LOG.warn(msg, e);

      }

    }

    throw new IOException("Initialization of all the collectors failed. " +

      "Error in last collector was :" + lastException.getMessage(), lastException);

  }

3. MapOutPutBuffer的初始化   環形緩衝區物件

@SuppressWarnings("unchecked")

    public void init(MapOutputCollector.Context context

                    ) throws IOException, ClassNotFoundException {

      job = context.getJobConf();

      reporter = context.getReporter();

      mapTask = context.getMapTask();

      mapOutputFile = mapTask.getMapOutputFile();

      sortPhase = mapTask.getSortPhase();

      spilledRecordsCounter = reporter.getCounter(TaskCounter.SPILLED_RECORDS);

     // 獲取分割槽總個數,取決於ReduceTask的數量

      partitions = job.getNumReduceTasks();

      rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();

 

      //sanity checks

      // 從當前配置中,獲取mapreduce.map.sort.spill.percent,如果沒有設定,就是0.8

      final float spillper =

        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);

     // 獲取mapreduce.task.io.sort.mb,如果沒設定,就是100MB

      final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);

      indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,

                                         INDEX_CACHE_MEMORY_LIMIT_DEFAULT);

      if (spillper > (float)1.0 || spillper <= (float)0.0) {

        throw new IOException("Invalid \"" + JobContext.MAP_SORT_SPILL_PERCENT +

            "\": " + spillper);

      }

      if ((sortmb & 0x7FF) != sortmb) {

        throw new IOException(

            "Invalid \"" + JobContext.IO_SORT_MB + "\": " + sortmb);

      }

//  在溢寫前,對key-value排序,採用的排序器,使用快速排序,只排索引

      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",

            QuickSort.class, IndexedSorter.class), job);

      // buffers and accounting

      int maxMemUsage = sortmb << 20;

      maxMemUsage -= maxMemUsage % METASIZE;

     // 存放key-value

      kvbuffer = new byte[maxMemUsage];

      bufvoid = kvbuffer.length;

    // 儲存key-value的屬性資訊,分割槽號,索引等

      kvmeta = ByteBuffer.wrap(kvbuffer)

         .order(ByteOrder.nativeOrder())

         .asIntBuffer();

      setEquator(0);

      bufstart = bufend = bufindex = equator;

      kvstart = kvend = kvindex;

 

      maxRec = kvmeta.capacity() / NMETA;

      softLimit = (int)(kvbuffer.length * spillper);

      bufferRemaining = softLimit;

      LOG.info(JobContext.IO_SORT_MB + ": " + sortmb);

      LOG.info("soft limit at " + softLimit);

      LOG.info("bufstart = " + bufstart + "; bufvoid = " + bufvoid);

      LOG.info("kvstart = " + kvstart + "; length = " + maxRec);

 

      // k/v serialization

       //  獲取快速排序的Key的比較器,排序只按照key進行排序!

      comparator = job.getOutputKeyComparator();

 

    // 獲取key-value的序列化器

      keyClass = (Class<K>)job.getMapOutputKeyClass();

      valClass = (Class<V>)job.getMapOutputValueClass();

      serializationFactory = new SerializationFactory(job);

      keySerializer = serializationFactory.getSerializer(keyClass);

      keySerializer.open(bb);

      valSerializer = serializationFactory.getSerializer(valClass);

      valSerializer.open(bb);

 

      // output counters

      mapOutputByteCounter = reporter.getCounter(TaskCounter.MAP_OUTPUT_BYTES);

      mapOutputRecordCounter =

        reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);

      fileOutputByteCounter = reporter

          .getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);

 

     // 溢寫到磁碟,可以使用一個壓縮格式! 獲取指定的壓縮編解碼器

      // compression

      if (job.getCompressMapOutput()) {

        Class<? extends CompressionCodec> codecClass =

          job.getMapOutputCompressorClass(DefaultCodec.class);

        codec = ReflectionUtils.newInstance(codecClass, job);

      } else {

        codec = null;

      }

 

     // 獲取Combiner元件

      // combiner

      final Counters.Counter combineInputCounter =

        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);

      combinerRunner = CombinerRunner.create(job, getTaskID(),

                                             combineInputCounter,

                                             reporter, null);

      if (combinerRunner != null) {

        final Counters.Counter combineOutputCounter =

          reporter.getCounter(TaskCounter.COMBINE_OUTPUT_RECORDS);

        combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter, reporter, job);

      } else {

        combineCollector = null;

      }

      spillInProgress = false;

      minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);

     // 設定溢寫執行緒在後臺執行,溢寫是在後臺執行另外一個溢寫執行緒!和收集是兩個執行緒!

      spillThread.setDaemon(true);

      spillThread.setName("SpillThread");

      spillLock.lock();

      try {

     // 啟動執行緒

        spillThread.start();

        while (!spillThreadRunning) {

          spillDone.await();

        }

      } catch (InterruptedException e) {

        throw new IOException("Spill thread failed to initialize", e);

      } finally {

        spillLock.unlock();

      }

      if (sortSpillException != null) {

        throw new IOException("Spill thread failed to initialize",

            sortSpillException);

      }

    }

4. Paritionner的獲取

從配置中讀取mapreduce.job.partitioner.class,如果沒有指定,採用HashPartitioner.class

如果reduceTask > 1, 還沒有設定分割槽元件,使用HashPartitioner

@SuppressWarnings("unchecked")

  public Class<? extends Partitioner<?,?>> getPartitionerClass()

     throws ClassNotFoundException {

    return (Class<? extends Partitioner<?,?>>)

      conf.getClass(PARTITIONER_CLASS_ATTR, HashPartitioner.class);

  }

 

 

public class HashPartitioner<K, V> extends Partitioner<K, V> {

 

  /** Use {@link Object#hashCode()} to partition. */

  public int getPartition(K key, V value,

                          int numReduceTasks) {

    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;

  }

 

}

 

分割槽號的限制:  0 <= 分割槽號 < 總的分割槽數(reduceTask的個數)

if (partition < 0 || partition >= partitions) {

        throw new IOException("Illegal partition for " + key + " (" +

            partition + ")");

      }

5.MapTask shuffle的流程

              ①在map()呼叫context.write()

              ②呼叫MapoutPutBuffer的collect()

                            呼叫分割槽元件Partitionner計算當前這組key-value的分割槽號

              ③將當前key-value收集到MapOutPutBuffer中

                            如果超過溢寫的閥值,在後臺啟動溢寫執行緒,來進行溢寫!

              ④溢寫前,先根據分割槽號,將相同分割槽號的key-value,採用快速排序演算法,進行排序!

                            排序並不在記憶體中移動key-value,而是記錄排序後key-value的有序索引!

              ⑤ 開始溢寫,按照排序後有序的索引,將檔案寫入到一個臨時的溢寫檔案中

                            如果沒有定義Combiner,直接溢寫!

                            如果定義了Combiner,使用CombinerRunner.conbine()對key-value處理後再次溢寫!

              ⑥多次溢寫後,每次溢寫都會產生一個臨時檔案

              ⑦最後,執行一次flush(),將剩餘的key-value進行溢寫

              ⑧MergeParts: 將多次溢寫的結果,儲存為一個總的檔案!

                     在合併為一個總的檔案前,會執行歸併排序,保證合併後的檔案,各個分割槽也是有序的!

                     如果定義了Conbiner,Conbiner會再次執行(前提是溢寫的檔案個數大於3)!

                     否則,就直接溢寫!

              ⑨最終保證生成一個最終的檔案,這個檔案根據總區號,分為若干部分,每個部分的key-value都已經排好序,等待ReduceTask來拷貝相應分割槽的資料

 

6. Combiner

combiner其實就是Reducer型別:

Class<? extends Reducer<K,V,K,V>> cls =

        (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();

                    

Combiner的執行時機:

MapTask:

              ①每次溢寫前,如果指定了Combiner,會執行

              ②將多個溢寫片段,進行合併為一個最終的檔案時,也會運行Combiner,前提是片段數>=3

ReduceTask:

              ③reduceTask在執行時,需要啟動shuffle程序拷貝MapTask產生的資料!

                     資料在copy後,進入shuffle工作的記憶體,在記憶體中進行merge和sort!

                     資料過多,內部不夠,將部分資料溢寫在磁碟!

                     如果有溢寫的過程,那麼combiner會再次執行!

 

①一定會執行,②,③需要條件!