1. 程式人生 > >WordCount工作流程分析與總結

WordCount工作流程分析與總結

筆記目的: 1.總結分析MapReduce的基本流程 2.總結分析WordCount的工作流程 3.總結分析程式碼WordCount程式碼 筆記時間: 2012年10月10日 By Yikun Mail:[email protected] 1關於MapReduce. 1.1摘要
MapReduce is a programming model and an associated implementation for processing and generating large datasets that is amenable to a broad variety of real-world tasks. Users specify the computation in terms of a map and a reduce function, and the underlying runtime system automatically parallelizes the computation across large-scale clusters of machines, handles machine failures, and schedules inter-machine communication to make efficient use of the network and disks. Programmers find the system easy to use: more than ten thousand distinct MapReduce programs have been implemented internally at Google over the past four years, and an average of one hundred thousand MapReduce jobs are executed on Google's clusters every day, processing a total of more than twenty petabytes of data per day.  MapReduce是一個程式設計模型,也是一個處理和生成大型資料集的模型的相關方法,這個模型方法適用於實際中各種需要解決的任務。使用者指定mao/reduce的函式後,內部執行的系統會自動調動大規模的機器叢集進行並行運算、處理機器故障,並安排內部的機器通訊,使得更有效的利用網路與磁碟。程式設計師很容易去利用系統:在過去的四年中,有一萬多不同的MapReduce程式執行在Google內部的叢集上,並且這些叢集平均每天執行著十萬的MapReduce的任務,這些任務每天處理20PB的資料。 --摘自《MapReduce: Simplied Data Processing on Large Clusters 》(Google, Inc. Jeffrey Deanand, Sanjay Ghemawat) 1.2工作流程概覽
根據Google MapReduce的原文,MapReduce的過程主要有以下幾個階段 1.資料分割準備階段 使用者程式首先呼叫MapReduce庫將輸入檔案分成M個數據片度,每個資料片段的大小一般從 16MB到64MB(可以通過可選的引數來控制每個資料片段的大小)。然後使用者程式在機群中建立大量的程式副本。 2.Map/Reduce任務分配階段 這些程式副本中的有一個特殊的程式–master。副本中其它的程式都是worker程式,由master分配任務。有M個Map任務和R個Reduce任務將被分配,master將一個Map任務或Reduce任務分配給一個空閒worker。
3.worker讀取分塊資料並處理城中間檔案
被分配了map任務 worker程式讀取相關的輸入資料片段,從輸入的資料片段中解析出key/value pair ,然後把key/value pair傳遞給使用者自定義Map函式,由Map函式生成並輸出的中間key/value pair ,並快取在記憶體中。
4.本地寫入階段,並傳回Master,準備Reduce 快取中key/value pair通過分割槽函式分成R個區域,之後週期性的寫入到本地磁碟上。快取key/value pair在本地磁碟上的儲存位置將被回傳給master ,由master負責把這些儲存位置再傳送給Reduce worker。
5.Reduce讀取、排序資料階段 當Reduce worker程式接收到master程式發來的資料儲存位置資訊後,使用RPC從Map worker所在主機的磁碟上讀取這些快取資料。當Reduce worker讀取了所有的中間資料後,通過對key進行排序後使得具有相同key值的資料聚合在一起。由於許多不同key值會對映到相同Reduce任務上,因此必須進行排序。如果中間資料太大無法在記憶體中完成排序 ,那麼就要在外部進行排序。
6.Reduce寫入輸出資料階段
Reduce worker程式遍歷排序後的中間資料,對於每一個唯一中間key值,Reduce worker程式將這個key值和它相關的中間value值的集合傳遞給使用者自定義 Reduce函式。Reduce函式的輸出被追加到所屬分割槽的輸出檔案。
7.完成階段 當所有Map和Reduce任務都完成之後,master喚醒使用者程式。在這個時候,在使用者程式裡的對MapReduce呼叫才返回。  2關於wordcount的程式 2.1wordcount工作流程分析 wordcount是下來分析一下wordcount的工作流程,wordcount是一個利用mapreduce實現單詞計數的程式。
  • ·因為對wordcount的理解牽扯到了一些關於HDFS工作的情況,所以在這裡先簡單的總結下HDFS的工作流程。

    上圖參考《Hadoop權威指南》,我理解為,客戶端的一些大型的資料,利用hadoop的命令(hadoop fs -put)將源資料存放在datanode中,而namenode中僅存在的是這些檔案的對映,客戶端通過namenode的對映地址可以讀取到datanode的檔案。這樣便完成了檔案的分割,構成了一個分散式處理的系統。 具體到WordCount中,我個人理解可以分為2個部分,一個是檔案分塊、部分,讀寫部分,另外一個是程式處理方面。 2.1.1 檔案部分(HDFS)。
首先當執行hadoop fs -put後,資料便分割到每個datanode中了。 當處理的時候,結合後面,工作流程,Map過程,包括產生的中間檔案都是儲存在datanode的本地儲存的,也就是說,不上傳到hdfs,直到reduce過程完成之後,才進行最後的寫入hdfs。 2.1.2程式處理部分(MapRedeuce)
可以看到,當分割好的檔案在datanode中之後,進行Map,Map主要完成源資料到<key,value>的預處理,生成的中間檔案,然後進行合併後,由Reduce過程輸出到HDFS,最終完成整個過程。 2.2wordcount原始碼解析

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
*
* 描述:WordCount explains by York
  * @author Hadoop Dev Group
*/
public class WordCount {
    /**
     * 建立Mapper類TokenizerMapper繼承自泛型類Mapper
     * Mapper類:實現了Map功能基類
     * Mapper介面:
     * WritableComparable介面:實現WritableComparable的類可以相互比較。所有被用作key的類應該實現此介面。
     * Reporter 則可用於報告整個應用的執行進度,本例中未使用。
     *
     */
  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
        /**
         * IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 資料型別的類,這些類實現了WritableComparable介面,
         * 都能夠被序列化從而便於在分散式環境中進行資料交換,你可以將它們分別視為int,String 的替代品。
     * 宣告one常量和word用於存放單詞的變數
         */
    private final static IntWritable one =new IntWritable(1);
    private Text word =new Text();
    /**
         * Mapper中的map方法:
         * void map(K1 key, V1 value, Context context)
         * 對映一個單個的輸入k/v對到一箇中間的k/v對
         * 輸出對不需要和輸入對是相同的型別,輸入對可以對映到0個或多個輸出對。
         * Context:收集Mapper輸出的<k,v>對。
         * Context的write(k, v)方法:增加一個(k,v)對到context
         * 程式設計師主要編寫Map和Reduce函式.這個Map函式使用StringTokenizer函式對字串進行分隔,通過write方法把單詞存入word中
     * write方法存入(單詞,1)這樣的二元組到context中
     */ 
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr =new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
 
  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result =new IntWritable();
    /**
         * Reducer類中的reduce方法:
      * void reduce(Text key, Iterable<IntWritable> values, Context context)
         * 中k/v來自於map函式中的context,可能經過了進一步處理(combiner),同樣通過context輸出          
         */
    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum =0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
        /**
         * Configuration:map/reduce的j配置類,向hadoop框架描述map-reduce執行的工作
         */
    Configuration conf =new Configuration();
    String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length !=2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job =new Job(conf, "word count");    //設定一個使用者定義的job名稱
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);    //為job設定Mapper類
    job.setCombinerClass(IntSumReducer.class);    //為job設定Combiner類
    job.setReducerClass(IntSumReducer.class);    //為job設定Reducer類
    job.setOutputKeyClass(Text.class);        //為job的輸出資料設定Key類
    job.setOutputValueClass(IntWritable.class);    //為job輸出設定value類
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    //為job設定輸入路徑
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為job設定輸出路徑
    System.exit(job.waitForCompletion(true) ?0 : 1);        //執行job
  }
}