1. 程式人生 > >Hadoop2.0 Mapreduce例項WordCount體驗

Hadoop2.0 Mapreduce例項WordCount體驗

     在Hadoop2.0中MapReduce程式的都需要繼承org.apache.hadoop.mapreduce.Mapper 和 org.apache.hadoop.mapreduce.Reducer這兩個基礎類,來定製自己的mapreduce功能,原始碼中主要的函式如下

Mapper.java

public void run(Context context) throws IOException, InterruptedException {
    setup(context);     // Called once at the beginning of the task.
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);   // Called once at the end of the task.
  }
 }

  /**
   * Called once for each key/value pair in the input split. Most applications
   * should override this, but the default is the identity function.
   */
  protected void map(KEYIN key, VALUEIN value, 
                     Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }
Reducer.java
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);     // Called once at the beginning of the task.
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);
    }
    cleanup(context);   // Called once at the end of the task.
  }

  /**
   * This method is called once for each key. Most applications will define
   * their reduce class by overriding this method. The default implementation
   * is an identity function.
   */
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

       在Mapper和Reducer類中,都有一個run()方法不斷提供(key,value)來呼叫map()和reduce()函式來處理,我們一般只需重寫其中的map和reduce方法。在mapreduce中只有支援序列化的類才能作為鍵值,其中的key還必須要是可比較的,故 key要實現WritableComparable介面,value只需要實現Writable介面。

如下給出自己參照原始碼寫的MyWordCount.java

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MyWordCount {

	public static class WordCountMapper extends Mapper<Object,Text,Text,IntWritable> {
        
		private static final IntWritable one =  new IntWritable(1);
		
		private Text word = new Text();

		protected void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			StringTokenizer words = new StringTokenizer(line);
			while(words.hasMoreTokens()) {
				word.set(words.nextToken());
				context.write(word, one);
			}
		}
	}
	
	public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {

		private IntWritable totalNum = new IntWritable();
		
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {
		   int sum = 0;
		   Iterator<IntWritable> it = values.iterator();
                   while(it.hasNext()) {
        	       sum += it.next().get();
                   }
                   totalNum.set(sum);
                   context.write(key,totalNum);	
		}
	}
	
	public static void main(String[] args) throws Exception{
		Configuration conf = new Configuration();           

		Job job = new Job(conf,"MyWordCount");
		
		job.setJarByClass(MyWordCount.class); //設定執行jar中的class名稱
		
		job.setMapperClass(WordCountMapper.class);//設定mapreduce中的mapper reducer combiner類
		job.setReducerClass(WordCountReducer.class);
                job.setCombinerClass(WordCountReducer.class); 
		
		job.setOutputKeyClass(Text.class); //設定輸出結果鍵值對型別
                job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job,new Path(args[0]));//設定mapreduce輸入輸出檔案路徑
		FileOutputFormat.setOutputPath(job,new Path(args[1]));
		
		System.exit(job.waitForCompletion(true) ? 0:1);
	}
	
}