1. 程式人生 > >MapReduce剖析筆記之一:從WordCount理解MapReduce的幾個階段

MapReduce剖析筆記之一:從WordCount理解MapReduce的幾個階段

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

MapReduce即將一個計算任務分為兩個階段:Map、Reduce。為什麼要這麼分解?
為了理解其含義,我們先不管MapReduce這一套框架,從一個簡單的問題來看,如果對於100T的日誌檔案,需要統計其中出現的"ERROR"這個單詞的次數,怎麼辦?
最簡單的方法:單機處理,逐行讀入每一行文字,統計並累加,則得到其值。問題是:因為資料量太大,速度太慢,怎麼辦?自然,多機並行處理就是一個自然的選擇。
那麼,這個檔案怎麼切分到多個機器呢?假定有100臺機器,可以寫一個主程式,將這個100T大檔案按照每個機器儲存1T的原則,在100臺機器上分佈儲存,再把原來單機上的程式拷貝100份(無需修改)至100臺機器上執行得到結果,此時得到的結果只是一箇中間結果,最後需要寫一個彙總程式,將統計結果進行累加,則完成計算。
將大檔案分解後,對單機1T檔案計算的過程就相當於Map,而Map的結果就相當於"ERROR"這個單詞在本機1T檔案中出現的次數,而最後的彙總程式就相當於Reduce,Reduce的輸入來源於100臺機器。在這個簡單的例子中,有100個Map任務,1個Reduce任務。 100臺機器計算後的中間結果需要傳遞到Reduce任務所在機器上,這個過程就是Shuffle,Shuffle這個單詞的含義是”洗牌“,也就是將中間結果從Map所在機器傳輸到Reduce所在機器,在這個過程中,存在網路傳輸。 此時,我們利用上面的例子已經理解了Map-Shuffle-Reduce的基本含義,在上面的例子中,如果還需要對”WARNING“這個單詞進行統計,那麼怎麼辦呢?此時,每個Map任務就不僅需要統計本機1T檔案中ERROR的個數,還需要統計WARNING的次數,然後在Reduce程式中分別進行統計。如果需要對所有單詞進行統計呢?一個道理,每個Map任務對1T檔案中所有單詞進行統計計數,然後 Reduce對所有結果進行彙總,得到所有單詞在100T大檔案中出現的次數。此時,問題可能出現了,因為單詞數量可能很多,Reduce用單機處理也可能存在瓶頸了,於是我們需要考慮用多臺機器平行計算Reduce,假如用2臺機器,因為Reduce只是對單詞進行計數累加,所有可以按照這樣簡單的規則進行:大寫字母A-Z開頭的單詞由Reduce 1累加;小寫字母a-z開頭的單詞由Reduce 2累加。
在這種情況下,100個Map任務執行後的結果,都需要分為兩部分,一部分準備送到Reduce 1統計,一部分準備送到Reduce 2統計,這個功能稱為Partitioner,即將Map後的結果(比如一個文字檔案,記錄了各個單詞在本機檔案出現的次數)分解為兩部分(比如兩個文字 檔案),準備送到兩個Reduce任務。 因此,Shuffle在這裡就是從100個Map任務和2個Reduce任務之間傳輸中間結果的過程。
我們繼續考慮幾個問題:
1、 如果Map後的中間結果資料量較大,Shuffle過程對網路頻寬要求較高,因此需要將Map後的結果儘可能減小,這個功能當然可以在Map內自己搞 定,不過MapReduce將這個功能單獨拎出來,稱為Combiner,即合併,這個Combiner,指的是Map任務後中間結果的合併,相比於 Reduce的最終合併,這裡相當於先進行一下區域性彙總,減小中間結果,進而減小網路傳輸量。所以,在上面的例子中,假如Map並不計數,只是記錄單詞出現這個資訊,輸出結果是<ERROR,1>,<WARNING,1>,<WARNING,1>.....這樣一個Key-Value序列,Combiner可以進行區域性彙總,將Key相同的Value進行累加,形成一個新的Key-Value序列:<ERROR,14>,<WARNING,27>,.....,這樣就大大減小了Shuffle需要的網路頻寬,要知道現在資料中心一般使用千兆乙太網,好些的使用萬兆乙太網,TCP/IP傳輸的效率不太高。這裡Combiner彙總函式實際上可以與Reduce的彙總函式一致,只是輸入資料不同。
2、 來自100個Map任務後的結果分別送到兩個Reduce任務處理。對於任何一個Reduce任務,輸入是一堆<ERROR,14>這樣的 Key-Value序列,因為100個Map任務都有可能統計到ERROR的次數,因此這裡會先進行一個歸併,即將相同單詞的歸併到一起,形 成<ERROR, <14,36,.....>>,<WARNING,<27,45,...>>這樣一個仍然是Key-Value的 序列,14、36、。。。分別表示第1、2、。。。臺機器中ERROR的統計次數,這個歸併過程在MapReduce中稱為Merge。如果merge後 再進行Reduce,那麼就只需要統計即可;如果事先沒有merge,那麼Reduce自己完成這一功能也行,只是兩種情況下Reduce的輸入Key- Value形式不同。
3、如果要求最後的單詞統計結果還要形成字典序怎麼辦呢?可以自己在 Reduce中進行全排序,也可以100個Map任務中分別進行區域性排序,然後將結果發到Reduce任務時,再進行歸併排序。這個過程 MapReduce也內建支援,因此不需要使用者自己去寫排序程式,這個過程在MapReduce中稱為Sort。
到這裡,我們理解了MapReduce中的幾個典型步驟:Map、Sort、Partitioner、 Combiner、Shuffle、Merge、Reduce。MapReduce程式之所以稱為MapReduce,就說明Map、Reduce這兩個 步驟對於一個平行計算來說幾乎是必須的,你總得先分開算吧,所以必須有Map;你總得彙總吧,所以有Reduce。當然,理論上也可以不需要 Reduce,如果Map後就得到你要的結果的話。
Sort對於不需要順序的程式裡沒意義(但MapReduce預設做了排序);

Partitioner對於Reduce只有一個的時候沒意義,如果有多個Reduce,則需要,至於怎麼分,使用者可以繼承Partitioner標準類,自己實現分解函式。控制中間結果如何傳輸。MapReduce提供的標準的Partitioner是 一個介面,使用者可以自己實現getPartition()函式,MapReduce也提供了幾個基本的實現,最典型的HashPartitioner是根 據使用者設定的Reduce任務數量(注意,MapReduce中,Map任務的個數最終取決於資料分佈,Reduce則是使用者直接指定),按照雜湊進行計算的:

public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {

  public void configure(JobConf job) {}

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K2 key, V2 value,int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}
這裡,numReduceTasks就是使用者設定的Reduce任務數量;
K2 key, V2 value 就是Map計算後的中間結果。
Combiner可以選擇性放棄,但考慮到網路頻寬,可以自己寫相應的函式實現區域性合併功能。很多情況下,直接利用Reduce那個程式即可,WordCount這個標準程式裡就是這麼用的。
Shuffle自然是必須的,不用寫,根據Partitioner邏輯,框架自己去執行結果傳輸。
Merge也不是必須的,可以揉到Reduce裡面實現等等也可以。因為這些操作的資料結構都是Key-Value,Reduce的輸入只要是一個Key-Value即可,相當靈活。
我們再來看WordCount,這個MapReduce程式中定義了一個類:
  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>
而Mapper是Hadoop中的一個介面,其定義為:
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {
  
  /** 
   * Maps a single input key/value pair into an intermediate key/value pair.
   * 
   * <p>Output pairs need not be of the same types as input pairs.  A given 
   * input pair may map to zero or many output pairs.  Output pairs are 
   * collected with calls to 
   * {@link OutputCollector#collect(Object,Object)}.</p>
   *
   * <p>Applications can use the {@link Reporter} provided to report progress 
   * or just indicate that they are alive. In scenarios where the application 
   * takes an insignificant amount of time to process individual key/value 
   * pairs, this is crucial since the framework might assume that the task has 
   * timed-out and kill that task. The other way of avoiding this is to set 
   * <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout">
   * mapred.task.timeout</a> to a high-enough value (or even zero for no 
   * time-outs).</p>
   * 
   * @param key the input key.
   * @param value the input value.
   * @param output collects mapped keys and values.
   * @param reporter facility to report progress.
   */
  void map(K1 key, V1 value, OutputCollector<K2, V2> output, Reporter reporter)
  throws IOException;
}
因此,Mapper裡面並沒有規定輸入輸出的型別是什麼,只要是KeyValue的即可,K1、V1、K2、V2是什麼由使用者指定,反正只是實現K1、V1到K2、V2的對映即可。

在WordCount中實現了繼承於Mapper<Object, Text, Text, IntWritable>的一個TokenizerMapper類,實現了map函式:map(Object key, Text value, Context context ) ;

TokenizerMapper中,輸入的Key-Value是<Object, Text>,輸出是<Text, IntWritable>,在WordCount程式裡,K1代表一行文字的起始位置,V1代表這一行文字;

K2代表單詞,V2代表"1",用於後面的累和。

同樣,在MapReduce中,Reducer也是一個介面,其宣告為:
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {
  
  /** 
   * <i>Reduces</i> values for a given key.  
   * 
   * <p>The framework calls this method for each 
   * <code><key, (list of values)></code> pair in the grouped inputs.
   * Output values must be of the same type as input values.  Input keys must 
   * not be altered. The framework will <b>reuse</b> the key and value objects
   * that are passed into the reduce, therefore the application should clone
   * the objects they want to keep a copy of. In many cases, all values are 
   * combined into zero or one value.
   * </p>
   *   
   * <p>Output pairs are collected with calls to  
   * {@link OutputCollector#collect(Object,Object)}.</p>
   *
   * <p>Applications can use the {@link Reporter} provided to report progress 
   * or just indicate that they are alive. In scenarios where the application 
   * takes an insignificant amount of time to process individual key/value 
   * pairs, this is crucial since the framework might assume that the task has 
   * timed-out and kill that task. The other way of avoiding this is to set 
   * <a href="{@docRoot}/../mapred-default.html#mapred.task.timeout">
   * mapred.task.timeout</a> to a high-enough value (or even zero for no 
   * time-outs).</p>
   * 
   * @param key the key.
   * @param values the list of values to reduce.
   * @param output to collect keys and combined values.
   * @param reporter facility to report progress.
   */
  void reduce(K2 key, Iterator<V2> values,
              OutputCollector<K3, V3> output, Reporter reporter)
    throws IOException;
}
Reducer的輸入為K2, V2(這個對應於Mapper輸出的經過Shuffle到達Reducer端的K2,V2,), 輸出為K3, V3。
在WordCount中,K2為單詞,V2為1這個固定值(或者為區域性出現次數,取決於是否有Combiner);K3還是單詞,V3就是累和值。
而WordCount裡存在繼承於Reducer<Text, IntWritable, Text, IntWritable>的IntSumReducer類,完成單詞計數累加功能。
對於Combiner,實際上MapReduce沒有Combiner這個基類(WordCount自然也沒有實現),從任務的提交函式來看:
public void setCombinerClass(Class<? extends Reducer> cls
                               ) throws IllegalStateException {
    ensureState(JobState.DEFINE);
    conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);
  }
可以看出,Combiner使用的類實際上符合Reducer。兩者是一樣的。

再來看作業提交程式碼:
在之前先說一下Job和Task的區別,一個MapReduce執行流程稱為一個Job,中文稱“作業”。
在傳統的分散式計算領域,一個Job分為多個Task執行。Task中文一般稱為任務,在Hadoop中,這種任務有兩種:Map和Reduce
所以下面說到Map和Reduce時,指的是任務;說到整個流程時,指的是作業。不過由於疏忽,可能會將作業稱為任務的情況。
根據上下文容易區分出來。

 1     Job job = new Job(conf, "word count");
 2     job.setJarByClass(WordCount.class);
 3     job.setMapperClass(TokenizerMapper.class);
 4     job.setCombinerClass(IntSumReducer.class);
 5     job.setReducerClass(IntSumReducer.class);
 6     job.setOutputKeyClass(Text.class);
 7     job.setOutputValueClass(IntWritable.class);
 8     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
 9     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
10     System.exit(job.waitForCompletion(true) ? 0 : 1);
第1行建立一個Job物件,Job是MapReduce中提供的一個作業類,其宣告為:
public class Job extends JobContext {  
  public static enum JobState {DEFINE, RUNNING};
  private JobState state = JobState.DEFINE;
  private JobClient jobClient;
  private RunningJob info;
.......
之後,設定該作業的執行類,也就是WordCount這個類;

然後設定Map、Combiner、Reduce三個實現類;

之後,設定輸出Key和Value的類,這兩個類表明了MapReduce作業完畢後的結果。

Key即單詞,為一個Text物件,Text是Hadoop提供的一個可以序列化的文字類;

Value為計數,為一個IntWritable物件,IntWritable是Hadoop提供的一個可以序列化的整數類。

之所以不用普通的String和int,是因為輸出Key、 Value需要寫入HDFS,因此Key和Value都要可寫,這種可寫能力在Hadoop中使用一個介面Writable表示,其實就相當於序列化,換句話說,Key、Value必須得有可序列化的能力。Writable的宣告為:
public interface Writable {
  /** 
   * Serialize the fields of this object to <code>out</code>.
   * 
   * @param out <code>DataOuput</code> to serialize this object into.
   * @throws IOException
   */
  void write(DataOutput out) throws IOException;

  /** 
   * Deserialize the fields of this object from <code>in</code>.  
   * 
   * <p>For efficiency, implementations should attempt to re-use storage in the 
   * existing object where possible.</p>
   * 
   * @param in <code>DataInput</code> to deseriablize this object from.
   * @throws IOException
   */
  void readFields(DataInput in) throws IOException;
}

在第8、9行,還設定了要計算的檔案在HDFS中的路徑,設定好這些配置和引數後,執行作業提交:job.waitForCompletion(true)
waitForCompletion是Job類中實現的一個方法:
public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      jobClient.monitorAndPrintJob(conf, info);
    } else {
      info.waitForCompletion();
    }
    return isSuccessful();
  }
即執行submit函式:
public void submit() throws IOException, InterruptedException, 
                              ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    
    // Connect to the JobTracker and submit the job
    connect();
    info = jobClient.submitJobInternal(conf);
    super.setJobID(info.getID());
    state = JobState.RUNNING;
}
其中,呼叫jobClient物件的submitJobInternal方法進行作業提交。jobClient是 JobClient物件,在執行connect()的時候即創建出來:
private void connect() throws IOException, InterruptedException {
    ugi.doAs(new PrivilegedExceptionAction<Object>() {
      public Object run() throws IOException {
        jobClient = new JobClient((JobConf) getConfiguration());    
        return null;
      }
    });
}
建立JobClient的引數是這個作業的配置資訊,JobClient是MapReduce作業的客戶端部分,主要用於提交作業等等。而具體的作業提交在submitJobInternal方法中實現,關於submitJobInternal的具體實現,包括MapReduce的作業執行流程,較為複雜,留作下一節描述。

關於MapReduce的這一流程,我們也可以看出一些特點:

1、 Map任務之間是不通訊的,這與傳統的MPI(Message Passing Interface)存在本質區別,這就要求劃分後的任務具有獨立性。這個要求一方面限制了MapReduce的應用場合,但另一方面對於任務執行出錯後的處理十分方便,比如執行某個Map任務的機器掛掉了,可以不管其他Map任務,重新在另一臺機器上執行一遍即可。因為底層的資料在HDFS裡面,有3 份備份,所以資料冗餘搭配上Map的重執行這一能力,可以將叢集計算的容錯性相比MPI而言大大增強。後續博文會對MPI進行剖析,也會對 MapReduce與傳統高效能運算中的平行計算框架進行比較。

2、Map任務的分配與資料的分佈關係十分密切,對於上面的例子,這個100T的大檔案分佈在多臺機器上,MapReduce框架會根據檔案的實際儲存位置分配Map任務,這一過程需要對HDFS有好的理解,在後續博文中會對HDFS中進行剖析。到時候,能更好滴理解MapReduce框架。因為兩者是搭配起來使用的。

3、 MapReduce的輸入資料來自於HDFS,輸出結果也寫到HDFS。如果一個事情很複雜,需要分成很多個MapReduce作業反覆執行,那麼就需要來來回回地從磁碟中搬移資料的過程,速度很慢,後續博文會對Spark這一記憶體計算框架進行剖析,到時候,能更好滴理解MapReduce效能。

4、MapReduce的輸入資料和輸出結果也可以來自於HBase,HBase本身搭建於HDFS之上(理論上也可以搭建於其他檔案系統),這種應用場合大多需要MapReduce處理一些海量結構化資料。後續博文會對HBase進行剖析。
轉帖:http://www.cnblogs.com/esingchan/p/3917094.html?utm_source=tuicool&utm_medium=referral