1. 程式人生 > >學習Hadoop MapReduce與WordCount例子分析

學習Hadoop MapReduce與WordCount例子分析

/*
MapReduce框架一直圍繞著key-value這樣的資料結構,下面以官方自帶的WordCount為例子,自己分析MapReduce的工作機制。MapReduce可以分為Map和Reduce過程,
程式碼實現了兩個類,分別是繼承Mapper和Reduceer,Mapper類裡面有map介面,Reduceer類有reduce介面,對於統計單詞這個例子來說,MapReduce會把檔案以行為
拆分物件,每分析一行就會呼叫Mapper類裡面的map介面,然後map接口裡面的程式碼由程式設計師實現其邏輯,然後把map介面處理完的結果輸送給Reduceer的reduce的接
口,中間還可以插入一個combiner的介面用於對map介面的資料進行中間結果處理再丟給reduce做最終的彙總。具體流程看程式碼註釋。
*/

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{
    /*
    Mapper他是一個模板類,Class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,
    KEYIN 輸入key的型別,VALUEIN輸入value的型別
    KEYOUT 輸出key的型別,VALUEOUT輸出value的型別
    四個型別決定了map介面的輸入與輸出型別

    比較形象地描述key,value,在map,combiner,reduce流轉的
    (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

    其中還有規定,就是KEY和VALUE型別必須是實現了Writeable介面的,KEY型別還需要額外實現WritableComparable介面

    通常在Mapper模板裡面,KEYIN是不需要特指定的,就用基類Object就可以了,VAULEIN指定為Text,這個Text是<pre name="code" class="java">    org.apache.hadoop.io.Text,這個Text已經滿足了實現Writeable介面的條件了,在這個例子裡面VALUE就是檔案的行內
    容,所以定義型別為Text。
    對於KEYOUT和VALUEOUT,作為輸出key型別和value型別,這裡定義為Text和IntWritable,keyout就是需要統計單詞個數
    的單詞,IntWriteable就是對應某個單詞的次數,其實這個就是一個Int型別,為了符合介面需要所以就基礎了Writeable
    Context它是一個貫通map介面<-->combiner介面<-->reduce介面的上下文資料,在map接口裡面,單詞對應次數會儲存在context
    裡面,到了reduce介面,MapReduce會把之前map的context用key對應結果集合的形式給reduce介面。
    */

    private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }

    /*
    下面是對兩個檔案統計單詞呼叫map介面之後的context結果
    For the given sample input the first map emits:
    < Hello, 1>
    < World, 1>
    < Bye, 1>
    < World, 1>

    The second map emits:
    < Hello, 1>
    < Hadoop, 1>
    < Goodbye, 1>
    < Hadoop, 1>
    */


    public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { /*
    Reduceer也是一個類模板,跟Mapper一樣需要指定KEYIN,VALUEIN,KEYOUT,VALUEOUT,
    其中KEYIN和VALUEIN必須跟Mapper的KEYOUT,VALUEOUT一一對應,因為map介面輸出的結果key->value
    就是reduce介面的輸入,只是MapReduce框架把map接口裡面相同的key變成一個key->values
    的values集合,所以在reduce接口裡面KEYIN是Text也就是單詞,VALUEOUT是IntWriteable集合的
    迭代器Interable<IntWriteable>,context就是reduce的輸出結果了

    */

    private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }

    /*
    在例子裡面,還指定了combiner,其實cominer和reduce都是同一個介面reduce,第一次呼叫reduce介面是combiner過程,把每個檔案
    的單詞做了key->value 到 key->values的彙總,結果如下
    The output of the first map:
    < Bye, 1>
    < Hello, 1>
    < World, 2>

    The output of the second map:
    < Goodbye, 1>
    < Hadoop, 2>
    < Hello, 1>
    */


    /*
    第二次呼叫reduce介面,就是reduce的過程,把combiner處理過的中間結果做一次最終的彙總
    < Bye, 1>
    < Goodbye, 1>
    < Hadoop, 2>
    < Hello, 2>
    < World, 2>
    */

    public static void main(String[] args) throws Exception { 
        Configuration conf = new Configuration(); 
        Job job = Job.getInstance(conf, "word count"); 
        job.setJarByClass(WordCount.class); 
        job.setMapperClass(TokenizerMapper.class); 
        job.setCombinerClass(IntSumReducer.class); 
        job.setReducerClass(IntSumReducer.class); 
        job.setOutputKeyClass(Text.class); 
        job.setOutputValueClass(IntWritable.class); 
        FileInputFormat.addInputPath(job, new Path(args[0])); 
        FileOutputFormat.setOutputPath(job, new Path(args[1])); 
        System.exit(job.waitForCompletion(true) ? 0 : 1); 
    }
}