1. 程式人生 > >MapReduce理解-深入理解MapReduce

MapReduce理解-深入理解MapReduce

 前面的幾篇部落格主要介紹了Hadoop的儲存HDFS,接下來幾篇部落格主要介紹Hadoop的計算框架MapReduce。本片部落格主要講解MapReduce框架的具體執行流程,以及shuffle過程,當然這方面的技術部落格已經特別多而且都寫得很優秀,我寫本篇部落格之前也有過相關閱讀,受益匪淺。對一些部落格和資料的參考都會才部落格下方參考資料中列出。

MapReduce理解

  • MapRedeuce,我們可以把它分開來理解:
  1. 對映(Mapping) :對集合裡的每個目標應用同一個操作。即,如果你想把表單裡每個單元格乘以二,那麼把這個函式單獨地應用在每個單元格上的操作就屬於mapping(這裡體現了移動計算
    而不是移動資料);
  2. 化簡(Reducing)遍歷集合中的元素來返回一個綜合的結果。即,輸出表單裡一列數字的和這個任務屬於reducing。
  • 計算框架
    一個簡單的MapReduce執行流程一個簡單的MapReduce執行流程
    簡單理解,MapReduce計算框架

    把需要計算的東西放入到MapReduce中進行計算,然後返回一個我們期望的結果。所以首先我們需要一個來源(需要計算的東西)即輸入(input),然後MapReduce操作這個輸入(input),通過定義好的計算模型,最後得到一個(期望的結果)輸出(output)。

  • 計算模型
    Map和ReduceMap和Reduce
    在這裡我們主要討論的是MapReduce計算模型

    在執行一個mapreduce計算任務時候,任務過程被分為兩個階段:map階段和reduce階段,每個階段都是用鍵值對(key/value)作為輸入(input)和輸出(output)。而程式設計師要做的就是定義好這兩個階段的函式:map函式和reduce函式。

例項程式碼

  • 以MapReduce統計單詞次數為例(虛擬碼),主要四個模組來講解,如上圖計算框架:
  1. Input,資料讀入

    1
    2
    3
    4
    5
    6
    
    // 設定資料輸入來源
    FileInputFormat.setInputPaths(job, args[0]);
    FileInputFormat.setInputDirRecursive(job, true); //遞迴
    job.setInputFormatClass(TextInputFormat.class);	//設定輸入格式
    
    //TextInputFormat,一種預設的文字輸入格式,Mapper一次讀取文字中的一行資料。
    
  2. 使用Mapper計算

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    
    //設定Job的Mapper計算類和K2、V2型別
    job.setMapperClass(WordCountMapper.class);	//1.設定Mapper類
    job.setMapOutputKeyClass(Text.class);	//設定Mapper輸出Key的型別
    job.setMapOutputValueClass(LongWritable.class);//設定Mapper輸出Value的型別
    
    //WordCountMapper類
    /**
     * 自定義的Map 需要繼承Mapper
     * K1 : 行序號
     * V1 : 行資訊
     * K2 : 單詞
     * V2 : 次數
     */
    public static class WordCountMapper extends Mapper<LongWritable,Text,Text,LongWritable> {
    
        Text k2 = new Text() ;
        LongWritable v2 = new LongWritable();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) 
                throws IOException, InterruptedException {
    
            //1. 獲取行資訊
            String line = value.toString();
    
            //2. 獲取行的所用單詞
            String[] words = line.split("\t");//這裡假設一行文字單詞分隔符為"\t"
            for (String word : words) {
                k2.set(word.getBytes()) ; //設定鍵
                v2.set(1);                //設定值
                context.write(k2,v2);
            }
    
        }
    }
    
  3. 使用Reducer合併計算

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
    //設定Job的Reducer計算類和K3、V3型別
    job.setReducerClass(WordCountReducer.class);	//自定義的Reducer類
    job.setOutputKeyClass(Text.class);		//輸出Key型別
    job.setOutputValueClass(LongWritable.class);	//輸出Value型別
    
    //WordCountReducer 類
    /**
     * 自定義的Reduce 需要繼承Reducer
     * K2 : 字串
     * V3 : 次數(分組)
     * K3 : 字串
     * V3 : 次數(統計總的)
     */
    public static class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
    
        LongWritable v3 = new LongWritable() ;
        int sum  = 0 ;
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) 
                throws IOException, InterruptedException {
            sum = 0 ;
            for (LongWritable value : values) {
                sum +=value.get() ;
            }
            v3.set(sum);
            context.write( key , v3 );
        }
    }
    
  4. Output,資料寫出

    1
    
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    

執行機制

  • 下面從兩個角度來講解MapReduce的執行機制:
  1. 各個角色實體;
  2. 執行的時間先後順序
    MapReduce計算模型的執行機制MapReduce計算模型的執行機制

各個角色實體

  1. 程式執行時過程設計到的一個角色實體
    1.1. Client:編寫mapreduce程式,配置作業,提交作業的客戶端 ;
    1.2. ResourceManager:叢集中的資源分配管理 ;
    1.3. NodeManager:啟動和監管各自節點上的計算資源 ;
    1.4. ApplicationMaster:每個程式對應一個AM,負責程式的任務排程,本身也是執行在NM的Container中 ;
    1.5. HDFS:分散式檔案系統,儲存作業的資料、配置資訊等等。

  2. 客戶端提交Job
    2.1. 客戶端編寫好Job後,呼叫Job例項的Submit()或者waitForCompletion()方法提交作業;
    2.2. 客戶端向ResourceManager請求分配一個Application ID,客戶端會對程式的輸出、輸入路徑進行檢查,如果沒有問題,進行作業輸入分片的計算。

  3. Job提交到ResourceManager
    3.1. 將作業執行所需要的資源拷貝到HDFS中(jar包、配置檔案和計算出來的輸入分片資訊等);
    3.2. 呼叫ResourceManager的submitApplication方法將作業提交到ResourceManager。

  4. 給作業分配ApplicationMaster
    4.1. ResourceManager收到submitApplication方法的呼叫之後會命令一個NodeManager啟動一個Container ;
    4.2. 在該NodeManager的Container上啟動管理該作業的ApplicationMaster程序。

  5. ApplicationMaster初始化作業
    5.1. ApplicationMaster對作業進行初始化操作;
    5.2. ApplicationMaster從HDFS中獲得輸入分片資訊(map、reduce任務數)

  6. 任務分配
    6.1. ApplicationMaster為其每個map和reduce任務向RM請求計算資源;
    6.2. map任務優先於reduce任,map資料優先考慮本地化的資料。

  7. 任務執行,在 Container 上啟動任務(通過YarnChild程序來執行),執行map/reduce任務。

時間先後順序

  1. 輸入分片(input split)
    每個輸入分片會讓一個map任務來處理,預設情況下,以HDFS的一個塊的大小(預設為128M,可以設定)為一個分片。map輸出的結果會暫且放在一個環形記憶體緩衝區中(預設mapreduce.task.io.sort.mb=100M),當該緩衝區快要溢位時(預設mapreduce.map.sort.spill.percent=0.8),會在本地檔案系統中建立一個溢位檔案,將該緩衝區中的資料寫入這個檔案;

  2. map階段:由我們自己編寫,最後呼叫 context.write(…);

  3. partition分割槽階段
    3.1. 在map中呼叫 context.write(k2,v2)方法輸出,該方法會立刻呼叫 Partitioner類對資料進行分割槽,一個分割槽對應一個 reduce task。
    3.2. 預設的分割槽實現類是 HashPartitioner ,根據k2的雜湊值 % numReduceTasks,可能出現“資料傾斜”現象。
    3.3. 可以自定義 partition ,呼叫 job.setPartitioner(…)自己定義分割槽函式。

  4. combiner合併階段:將屬於同一個reduce處理的輸出結果進行合併操作
    4.1. 是可選的;
    4.2. 目的有三個:1.減少Key-Value對;2.減少網路傳輸;3.減少Reduce的處理。

  5. shuffle階段:即Map和Reduce中間的這個過程
    5.1. 首先 map 在做輸出時候會在記憶體裡開啟一個環形記憶體緩衝區,專門用來做輸出,同時map還會啟動一個守護執行緒;
    5.2. 如緩衝區的記憶體達到了閾值的80%,守護執行緒就會把內容寫到磁碟上,這個過程叫spill,另外的20%記憶體可以繼續寫入要寫進磁碟的資料;
    5.3. 寫入磁碟和寫入記憶體操作是互不干擾的,如果快取區被撐滿了,那麼map就會阻塞寫入記憶體的操作,讓寫入磁碟操作完成後再繼續執行寫入記憶體操作;
    5.4. 寫入磁碟時會有個排序操作,如果定義了combiner函式,那麼排序前還會執行combiner操作;
    5.5. 每次spill操作也就是寫入磁碟操作時候就會寫一個溢位檔案,也就是說在做map輸出有幾次spill就會產生多少個溢位檔案,等map輸出全部做完後,map會合並這些輸出檔案,這個過程裡還會有一個Partitioner操作(如上)
    5.6. 最後 reduce 就是合併map輸出檔案,Partitioner會找到對應的map輸出檔案,然後進行復制操作,複製操作時reduce會開啟幾個複製執行緒,這些執行緒默認個數是5個(可修改),這個複製過程和map寫入磁碟過程類似,也有閾值和記憶體大小,閾值一樣可以在配置檔案裡配置,而記憶體大小是直接使用reduce的tasktracker的記憶體大小,複製時候reduce還會進行排序操作和合並檔案操作,這些操作完了就會進行reduce計算了。

  6. reduce階段:由我們自己編寫,最終結果儲存在hdfs上的。

參考