MapReduce根據WordCount分析map和Reducer原理
阿新 • • 發佈:2018-11-10
Mapper 階段
package com.zyd.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 輸入的kye LongWritable 行號 * 輸入的value 序列化的String型別 Text 一行的內容 * 輸出的key Text 單詞 * 輸出的value 數字 IntWritable型別 單詞個數 */ public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ //避免迴圈時不斷建立物件 Text k = new Text(); IntWritable v = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1. 將一行內容轉換成String,因為傳進來的是Text型別 String line = value.toString(); // 2. 按照空格進行切割成一個個的單詞 String[] words = line.split(" "); //3. 迴圈寫出到下一階段 形式是 <word,1> for (String word : words){ //輸出時型別不匹配 但是避免每一次建立一個物件對於記憶體的損耗在方法外進行建立 k.set(word); context.write(k,v); } } }
Reducer階段
package com.zyd.wc; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * Reducer的輸入數map的輸出,所以序列化的型別要匹配 */ public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override /** * 相同的key進行計算 * 相同的key<word,1> 有多個,需要迭代器 * context:輸出 */ protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //統計單詞總個數 int sum =0; for (IntWritable count:values){ sum +=count.get(); } //輸出單詞總個數 context.write(key,new IntWritable(sum)); } }
驅動類
package com.zyd.wc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class WordCountRunner { public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1. 獲取配置資訊 或者job物件例項 Configuration conf= new Configuration(); Job job = Job.getInstance(conf); //6. 指定本程式的jar包所在本地路徑 //job.setJar("/home/wc.jar"); 由於地址變化,不合適 //底層框架實現,自動找jar的位置 job.setJarByClass(WordCountRunner.class); //2. 指定本業務job所使用的mapper和reducer業務類 job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); //3. 指定mapper輸出資料k,v型別 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //4.指定最終輸出資料的k,v型別 job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5. 指定job的輸入原始檔案所在目錄 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //7. 將job中配置的相關引數,以及job中所用的java類所在的jar包, //提交給yarn去執行 boolean result = job.waitForCompletion(true); System.out.println(result?0:1); } }