1. 程式人生 > >一個mapreduce例項加註釋

一個mapreduce例項加註釋

1.WCMapper.java

package cn.itcast.hadoop.mr.wordcount;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//4個泛型中,前兩個是指定mapper輸入資料的型別,KEYIN是輸入的key的型別,VALUEIN是輸入的value的型別
//map 和 reduce 的資料輸入輸出都是以 key-value對的形式封裝的
//預設情況下,框架傳遞給我們的mapper的輸入資料中,key是要處理的文字中一行的起始偏移量,這一行的內容作為value
public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
 
 //mapreduce框架每讀一行資料就呼叫一次該方法
 @Override
 protected void map(LongWritable key, Text value,Context context)
   throws IOException, InterruptedException {
  //具體業務邏輯就寫在這個方法體中,而且我們業務要處理的資料已經被框架傳遞進來,在方法的引數中 key-value
  //key 是這一行資料的起始偏移量     value 是這一行的文字內容
  
  //將這一行的內容轉換成string型別
  String line = value.toString();
  
  //對這一行的文字按特定分隔符切分
  String[] words = StringUtils.split(line, " ");
  
  //遍歷這個單詞陣列輸出為kv形式  k:單詞   v : 1
  for(String word : words){
   
   context.write(new Text(word), new LongWritable(1));
   
  }

  

 }

 
 
}

2.WCReducer.java

package cn.itcast.hadoop.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
 
 
 
 //框架在map處理完成之後,將所有kv對快取起來,進行分組,然後傳遞一個組<key,valus{}>,呼叫一次reduce方法
 //<hello,{1,1,1,1,1,1.....}>
 @Override
 protected void reduce(Text key, Iterable<LongWritable> values,Context context)
   throws IOException, InterruptedException {

  long count = 0;
  //遍歷value的list,進行累加求和
  for(LongWritable value:values){
   
   count += value.get();
  }
  
  //輸出這一個單詞的統計結果
  
  context.write(key, new LongWritable(count));
  
 }
 
 

}

3.WCRunner.java

package cn.itcast.hadoop.mr.wordcount;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 用來描述一個特定的作業
 * 比如,該作業使用哪個類作為邏輯處理中的map,哪個作為reduce
 * 還可以指定該作業要處理的資料所在的路徑
 * 還可以指定改作業輸出的結果放到哪個路徑
 * ....
 *  *
 */
public class WCRunner {

 public static void main(String[] args) throws Exception {
  
  Configuration conf = new Configuration();
  
  Job wcjob = Job.getInstance(conf);
  
  //設定整個job所用的那些類在哪個jar包
  wcjob.setJarByClass(WCRunner.class);
  
  
  //本job使用的mapper和reducer的類
  wcjob.setMapperClass(WCMapper.class);
  wcjob.setReducerClass(WCReducer.class);
  
  
  //指定reduce的輸出資料kv型別
  wcjob.setOutputKeyClass(Text.class);
  wcjob.setOutputValueClass(LongWritable.class);
  
  //指定mapper的輸出資料kv型別
  wcjob.setMapOutputKeyClass(Text.class);
  wcjob.setMapOutputValueClass(LongWritable.class);
  
  
  //指定要處理的輸入資料存放路徑
  FileInputFormat.setInputPaths(wcjob, new Path("hdfs://weekend110:9000/wc/srcdata/"));
  
  //指定處理結果的輸出資料存放路徑
  FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://weekend110:9000/wc/output3/"));
  
  //將job提交給叢集執行 
  wcjob.waitForCompletion(true);
  
  
 }
 
 
 
 
}