MapReduce第一個程式之WordCount
阿新 • • 發佈:2018-12-13
步驟
1)建立wordcount類繼承configured,實現tool介面 2)實現mapper內部類 3)實現reducer內部類 4)設定Job相關資訊 5)提交job執行
程式碼實現
* WordCount.java * com.hainiuxy * Copyright (c) 2018, 海牛版權所有. * @author 潘牛 */ package com.hainiuxy; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * 統計文字檔案單詞的個數 * @author 潘牛 * @Date 2018年9月20日 */ public class WordCount extends Configured implements Tool{ /* * ********輸入型別的確定,跟輸入的Format類有關係******* * public class TextInputFormat extends FileInputFormat<LongWritable, Text> * 有個 createRecordReader() 返回值型別是 return new LineRecordReader(recordDelimiterBytes); * * public class LineRecordReader extends RecordReader<LongWritable, Text> * * keyin:LongWritable * valuein:Text * 對於文字來說, keyIn: 行位元組的偏移量, long型別,封裝類是:LongWritable one world one dream kyein valuein 0 one world 10 one dream VALUEIN:一行資料 , String型別,封裝類是:Text *****下面兩個型別的確定是跟業務有關係 KEYOUT:單詞, Text VALUEOUT:數量, LongWritable */ public static class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ /** * map輸出的key: 單詞 */ Text keyOut = new Text(); /** * map 輸出 的value : 數值 * 封裝數值 */ LongWritable valueOut = new LongWritable(1L); //map():每行呼叫一次 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { System.out.println("------------------------------"); //one world String line = value.toString(); System.out.println("map input:" + key.get() + ", " + line); //[one, world] String[] splits = line.split(" "); for(String word : splits){ //封裝單詞 keyOut.set(word); //輸出 <單詞,1>的形式 context.write(keyOut, valueOut); System.out.println("map output:" + word + ", " + valueOut.get()); } } } /* * keyIn, VALUEIN:map輸出什麼型別,reduce就輸入什麼型別 * * KEYOUT,VALUEOUT:是跟業務有關係的 * KEYOUT:單詞, Text * VALUEOUT:數量, LongWritable */ public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ /** * reduce輸出的value:最終單詞統計的結果 * 封裝數值 */ LongWritable valueOut = new LongWritable(); //reduce():一個key呼叫一次 @Override //在這裡,reduce步的輸入相當於<單詞,valuelist>,如<Hello,<1,1>> protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { System.out.println("-------------------------"); //key:one //values:[1,1,1] //使用者累加 long sum = 0L; StringBuilder sb = new StringBuilder(); //one,[1,1,1] sb.append("reduce input:" + key.toString() + ", ["); for(LongWritable w : values){ sb.append(w.get()).append(","); sum += w.get(); } sb.deleteCharAt(sb.length() - 1).append("]"); System.out.println(sb.toString()); //valueOut.set(sum); //輸出統計結果 context.write(key, new LongWritable(sum)); System.out.println("reduce output:" + key.toString() + ", " + valueOut.get()); } } @Override public int run(String[] args) throws Exception { //獲取Configuration物件 Configuration conf = getConf(); //建立job物件和job的名字wordcount Job job = Job.getInstance(conf, "wordcount"); //設定job引數 //設定job執行類 job.setJarByClass(WordCount.class); //設定任務mapper執行類 job.setMapperClass(WordCountMapper.class); //設定任務reducer執行類 job.setReducerClass(WordCountReducer.class); // 【預設就一個reduce】如果預設,不需要設定,只有reduce個數!=1時設定, // 設定2個reduce job.setNumReduceTasks(2); //設定任務mapper輸出的key的型別 job.setMapOutputKeyClass(Text.class); //設定任務mapper輸出的value的型別 job.setMapOutputValueClass(LongWritable.class); //設定最終輸出的key型別 job.setOutputKeyClass(Text.class); //設定最終輸出的value型別 job.setOutputValueClass(LongWritable.class); // 設定輸入的格式:【預設是TextInputFormat.class】如果是文字,可以不寫;如果是其他的就必須設定此項 job.setInputFormatClass(TextInputFormat.class); // 【預設是TextOutputFormat.class】如果是文字,可以不寫;如果是其他的就必須設定此項 job.setOutputFormatClass(TextOutputFormat.class); //設定任務的輸入目錄 FileInputFormat.addInputPath(job, new Path(args[0])); //輸出目錄Path物件 Path outputDir = new Path(args[1]); //設定任務的輸出目錄 FileOutputFormat.setOutputPath(job, outputDir); //自動刪除輸出目錄 FileSystem fs = FileSystem.get(conf); if(fs.exists(outputDir)){ //遞迴刪目錄 fs.delete(outputDir, true); System.out.println("output dir【" + outputDir.toString() + "】 is deleted"); } //執行job任務, 阻塞的方法 //boolean status = job.waitForCompletion(true); //執行job return (job.waitForCompletion(true)) ? 0 : 1; } public static void main(String[] args) throws Exception { // /tmp/mr/input /tmp/mr/output System.exit(ToolRunner.run(new WordCount(), args)); } }