1. 程式人生 > >【MapReduce詳解及原始碼解析(一)】——分片輸入、Mapper及Map端Shuffle過程

【MapReduce詳解及原始碼解析(一)】——分片輸入、Mapper及Map端Shuffle過程


title: 【MapReduce詳解及原始碼解析(一)】——分片輸入、Mapper及Map端Shuffle過程
date: 2018-12-03 21:12:42
tags: Hadoop
categories: 大資料
toc: true
點選檢視我的部落格:Josonlee’s Blog
版權宣告:本文為博主原創文章,未經博主允許不得轉載(https://blog.csdn.net/lzw2016/)


文章目錄


之前在看這一部分內容時,也挺煩的,細讀一遍《Hadoop 海量資料處理·範東來著》書和老師給的若干Demo程式碼,自己再看看MapReduce的部分原始碼後,感觸挺深的。把自己所領會的記錄在這裡吧,文章挺長,一次可能寫不完(寫不完下次再說吧)

前瞻

使用者向Hadoop提交的最小單位是MR作業job,MR計算的最小單位是任務task。job分為多個task,task又分map任務,reduce任務
在Hadoop1.0版本,客戶端向Hadoop提交作業,JobTracker會將該作業拆分為多個任務,並根據心跳資訊交由空閒的TaskTracker 啟動。一個TaskTracker能夠啟動的任務數量是由TaskTracker 配置的任務槽(slot) 決定。槽是Hadoop的計算資源的表示模型,Hadoop將各個節點上的多維度資源(CPU、記憶體等)抽象成一維度的槽,這樣就將多維度資源分配問題轉換成維度的槽 分配的問題。在實際情況中, Map任務和Reduce任務需要的計算資源不盡相同,Hadoop又將槽分成Map槽和Reduce槽,並且Map任務只能使用Map槽,Reduce任務只能使用Reduce槽。
這樣做效能會很低,所以在Hadoop2.0版本,資源管理排程框架改為了Yarn,但MR任然作為計算框架存在

MapReduce過程概覽

在這裡插入圖片描述
這張圖出處不知,我也就隨便用了

一、輸入資料分片並轉化為鍵值對

在進行map計算之前,mapreduce會根據輸入檔案計算輸入分片(input split), 每個輸入分片(InputSplit)針對一個map任務輸入分片(input split)儲存的並非資料本身, 而是邏輯上一個分片長度和一個記錄資料的位置的陣列。 預設分片等同於塊大小,分片大小對MR效能影響很大,儘量和block大小相近可以提供map任務計算的資料本地性

可以設定一個map任務的參考個數值,見引數mapred.map.tasks,只是參考具體還是取決於分片數

下面原始碼中會看到分片大小具體是如何設定的

MapReduce Input Split(輸入分/切片)詳解,這篇文章對分片這一塊總結的挺不錯
​​
看下原始碼中InputSplit的實現,見InputFormat資料格式轉換介面
在這裡插入圖片描述

如圖,左側是實現該介面的幾個格式化資料的類,右側是該介面的兩個抽象方法:getSplits是將資料切分成若干個分片,createRecordReader是將輸入的分片解析成鍵值對(key-value),鍵是偏移量,值是該行的內容

FileInputFormat類及其子類的相關原始碼解析

繼續看下去,看下用的比較多的FileInputFormat類的原始碼

  • 分片大小計算
// 計算分片大小
protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

程式碼中的minSize、maxSize定義

minSize=max{minSplitSize,mapred.min.split.size}
maxSize=mapred.max.split.size

blockSize是塊大小,所以就是用設定中這三個引數而定的

  • 資料分片
// 資料分片
 public List<InputSplit> getSplits(JobContext job) throws IOException {
    Stopwatch sw = new Stopwatch().start();
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);
    // 此處省略

    if (isSplitable(job, path)) {  //可分片
          long blockSize = file.getBlockSize();
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          long bytesRemaining = length;
          //while迴圈和if判斷就是將輸入的資料長不斷分片過程
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
     // 獲取邏輯上每個切片的位置,長度,包含該分片的主機列表和在記憶體中包含該分片的主機列表
            bytesRemaining -= splitSize;
          }

          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        }
 // 此處省略
  return splits;
}
  • 解析key-value方法createRecordReader()

FileInputFormat也是個抽象類,繼承自該類的也不少,主要是根據不同檔案型別定義不同的解析key-value的方法
在這裡插入圖片描述

如圖,最常用的也是預設的輸入格式轉化類TextInputFormat,針對文字檔案

  • TextInputFormat 讀取輸入InputSplit的每行內容作為一次輸入,並以換行符或回車符表示本行結束
  • KeyValueTextInputFormat讀取輸入InputSplit的每行內容作為一次輸入,並以換行符或回車符表示本行結束,每行內容按分隔符 (預設是製表符 \t) 分為鍵和值部分。如果不存在分隔符,則鍵將是整行內容,並且值為空

程式中可以自定義分隔符,比如定義為逗號conf.set(“mapreduce.input.keyvaluelinerecordreader.key.value.separator”, “,”)

  • NLineInputFormat可以讓Mapper收到固定行數的輸入,每個map程序處理的 InputSplit不再按block塊去劃分,而是按指定的N值,即每個InputSplit中只有N行記錄

程式中指定N大小:比如1000行, conf.setInt(“mapreduce.input.lineinputformat.linespermap”, 1000)
這個N對MR效能有很大的影響,每N行是一個map task,快慢問題,所以要選取合適

  • SequenceFileInputFormat 是Hadoop的順序檔案格式,儲存二進位制的<K,V>對的序列。它的輸入格式<K,V>由順序檔案決定,只需要保證map輸入型別匹配即可

  • FixedLengthInputFormat是一種用於讀取包含固定長度記錄的輸入檔案的輸入格式,記錄的內容不一定是文字,可以是任意的二進位制資料

程式中必須設定定長,如20,conf.setInt(FixedLengthInputFormat.FIXED_RECORD_LENGTH, 20);

  • CombineFileInputFormat 是一個抽象的 InputFormat,它主要是針對小檔案而設計的,可以把多個檔案打包到一個分片中以便每個Mapper處理更多的操作(沒怎麼看過這個,也不知道這怎麼用)

以上大概就是Map任務輸入資料分片的部分,還有就是程式設計時,如果使用不同的格式化類要在程式中指定輸入資料格式化類

job.setInputFormatClass(Class<? extends InputFormat> cls)
// 如,job.setInputFormatClass(NLineInputFormat.class)

二、Map過程

DataNode中的資料經上文所談到的邏輯上分片、轉化後分配給每一個Map Task。有一點需要知道,Map、Reduce的輸入輸出都是k-v鍵值對。四隊k-v,<k1,v1>、<k2,v2>。。。

從程式設計角度,我們做的第一個工作就是編寫map函式邏輯,這個map函式就是重寫類Mapper的map函式,原始碼如下

/**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  @SuppressWarnings("unchecked")
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }

你所要做的就是,把輸入value轉出你需要的Map輸出<k2,v2>

Mapper中還有setUp初始化,cleanup結束map task方法,以及run方法

public void run(Context context) throws IOException, InterruptedException {// 引數是Context,分片
    setup(context);	// 初始化一個map任務
    try {
      while (context.nextKeyValue()) {	// 還有鍵值對,繼續對每一對進行map方法呼叫
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);	//結束map任務
    }
  }

由程式碼可看出,通常我們看到的一個檔案中每一行都會進行一次map,這就是由run方法完成的。map輸出結果不論Shuffle的話,通常是進行Combine。比如說詞頻統計,“hello hello you”,經過組合輸出<hello,2>,<you,1>

三、Shuffle過程

Shuffle過程在Map和Reduce端都會進行,Shuffle分為分割槽(Partition)、排序(Sort)、分組(Group)、組合(Combine)

Map端Shuffle過程

Map任務在完成後輸出的<k2,v2>,儲存在一個環形記憶體緩衝區中,預設100M。他有一個閾值,預設是0.8,一旦達到閾值,後臺會開啟一個執行緒以輪詢方式將緩衝區中的內容溢寫入磁碟。溢寫和map輸出寫入快取可同時進行,除非快取區已滿
在這裡插入圖片描述

如圖,快取中的輸出結果,先經過分割槽、排序、分組整合成不同(3個)分割槽,然後溢寫磁碟,再次排序、分組整合成不同(3個)分割槽。圖中並沒有展示出combine的部分

  • Partition 分割槽

分割槽是一種需求吧,由key值決定Mapper的輸出會被哪一個Reducer處理。比如說按年份,按月份分割槽

@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class Partitioner<KEY, VALUE> {	//抽象類Partitioner
//就一個函式,numPartitons是要分割槽的數量
public abstract int getPartition(KEY key, VALUE value, int numPartitions);	//返回值是key所對應的分割槽number
// 比如說按月分割槽,<1,xxx>,1月對應分割槽number為1
}
  • Sort 排序

Shuffle過程會有三次排序,其中Map階段有兩次,有上文圖片也可知
Shuffle預設是對key升序排序的,你也可以指定你的排序規則。如何指定,要實現WritableComparable介面,這有個例子

* <p>Example:</p>
 * <p><blockquote><pre>
 *     public class MyWritableComparable implements WritableComparable<MyWritableComparable> {
 *       // Some data
 *       private int counter;
 *       private long timestamp;
 *       
 *       public void write(DataOutput out) throws IOException {		//有些複雜排序,比如二次排序,需要重寫該方法
 *         out.writeInt(counter);
 *         out.writeLong(timestamp);
 *       }
 *       
 *       public void readFields(DataInput in) throws IOException {	//有些複雜排序,需要重寫該方法
 *         counter = in.readInt();
 *         timestamp = in.readLong();
 *       }
 *       
 *       public int compareTo(MyWritableComparable o) {	//這個是你要修改的
 *         int thisValue = this.value;
 *         int thatValue = o.value;
 *         return (thisValue &lt; thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
 *       }
 *
 *       public int hashCode() {
 *         final int prime = 31;
 *         int result = 1;
 *         result = prime * result + counter;
 *         result = prime * result + (int) (timestamp ^ (timestamp &gt;&gt;&gt; 32));
 *         return result
 *       }
 *     }

只能說具體問題,具體分析吧

這裡標記下

a.compareTo(b)按照a升序排序
b.compareTo(a) 按照a降序排序
返回負數,就是降序

  • Group 分組

分組是將具有相同key的values放置在一起,這個不是分割槽。分割槽是寫入不同檔案,分組是聚合key相同的,我的理解

  • Combine 組合

Combine很簡單,就一句話,Combine做的工作和Reduce是一樣的。所以他也繼承Reducer類,完成reduce方法,要求輸出k-v型別等於Reducer輸入k-v型別

程式中通過類似job.setCombinerClass(IntSumReducer.class);來指定指定Combiner類

經常看到說Combine針對求解最小、大值,不適合求平均數。我感覺也可以,實現邏輯是靠自己,你只要保證combine輸出型別和Reducer輸入、出一致就可以了,然後具體如何定義結構也不難

如果已經指定Combiner且溢位寫次數至少為3時,Combiner 就會在輸出檔案寫到磁碟之前執行。如前文所述,Combiner 可以多次執行,並不影響輸出結果。執行Combiner的意義在”於使map輸出的中間結果更緊湊,使得寫到本地磁碟和傳給Reducer的資料更少。

  • 寫磁碟補充

map輸出儲存的格式是IFile,IFile格式支援行壓縮。寫磁碟過程,壓縮map的輸出能提高I/O效能,佔空間小,傳給reduce的資料量也減小。(預設不壓縮的)

支援壓縮格式有:
在這裡插入圖片描述

  • bzip2 壓縮效果最好,壓縮/解壓速度最慢
  • LZO 壓縮效果不如bzip2和gzip,壓縮/解壓速度最快
  • gzip 壓縮效果不如 bzip2,壓縮/解壓速度較快