1. 程式人生 > >MapReduce第一個程式之WordCount

MapReduce第一個程式之WordCount

步驟

1)建立wordcount類繼承configured,實現tool介面 2)實現mapper內部類 3)實現reducer內部類 4)設定Job相關資訊 5)提交job執行

程式碼實現

 * WordCount.java
 * com.hainiuxy
 * Copyright (c) 2018, 海牛版權所有.
 * @author   潘牛                      
*/

package com.hainiuxy;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 統計文字檔案單詞的個數
 * @author   潘牛                      
 * @Date	 2018年9月20日 	 
 */
public class WordCount extends Configured implements Tool{
/*	
 * ********輸入型別的確定,跟輸入的Format類有關係*******
 * public class TextInputFormat extends FileInputFormat<LongWritable, Text> 
 * 有個 createRecordReader() 返回值型別是 return new LineRecordReader(recordDelimiterBytes);
 * 
 * public class LineRecordReader extends RecordReader<LongWritable, Text>
 * 
 * keyin:LongWritable
 * valuein:Text
 * 
	對於文字來說,
	keyIn: 行位元組的偏移量, long型別,封裝類是:LongWritable
	one world
	one dream
	kyein	valuein
	0		one world
	10		one dream
	
	VALUEIN:一行資料 , String型別,封裝類是:Text
	
	*****下面兩個型別的確定是跟業務有關係
	KEYOUT:單詞, Text
	
	VALUEOUT:數量, LongWritable
	
	*/
	public static class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		
		/**
		 * map輸出的key: 單詞
		 */
		Text keyOut = new Text();
		
		
		/**
		 * map 輸出 的value : 數值
		 * 封裝數值
		 */
		LongWritable valueOut = new LongWritable(1L);
		
		//map():每行呼叫一次
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			System.out.println("------------------------------");
			//one world
			String line = value.toString();
			System.out.println("map input:" + key.get() + ", " + line);
			//[one, world]
			String[] splits = line.split(" ");
			
			for(String word : splits){
				//封裝單詞
				keyOut.set(word);
				
				//輸出 <單詞,1>的形式
				context.write(keyOut, valueOut);
				
				System.out.println("map output:" + word + ", " + valueOut.get());
			}
			
		}
	}
	
	/*
	 * keyIn, VALUEIN:map輸出什麼型別,reduce就輸入什麼型別
	 * 
	 * KEYOUT,VALUEOUT:是跟業務有關係的
	 * KEYOUT:單詞, Text
	 * VALUEOUT:數量, LongWritable
	 */
	public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
		

		/**
		 * reduce輸出的value:最終單詞統計的結果
		 * 封裝數值
		 */
		LongWritable valueOut = new LongWritable();
		
		//reduce():一個key呼叫一次
		@Override
		//在這裡,reduce步的輸入相當於<單詞,valuelist>,如<Hello,<1,1>>
		protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
			
			System.out.println("-------------------------");
			//key:one
			//values:[1,1,1]
			//使用者累加
			long sum = 0L;
			StringBuilder sb = new StringBuilder();
			//one,[1,1,1]
			sb.append("reduce input:" + key.toString() + ", [");
			for(LongWritable w : values){
				sb.append(w.get()).append(",");
				sum += w.get();
			}
			
			sb.deleteCharAt(sb.length() - 1).append("]");
			System.out.println(sb.toString());
			
			//valueOut.set(sum);
			//輸出統計結果
			context.write(key, new LongWritable(sum));
			
			System.out.println("reduce output:" + key.toString() + ", " + valueOut.get());
			
		}
		
	}
	

	@Override
	public int run(String[] args) throws Exception {
		//獲取Configuration物件
		Configuration conf = getConf();
		//建立job物件和job的名字wordcount
		Job job = Job.getInstance(conf, "wordcount");
		//設定job引數
		
		//設定job執行類
		job.setJarByClass(WordCount.class);
		//設定任務mapper執行類
		job.setMapperClass(WordCountMapper.class);
		//設定任務reducer執行類
		job.setReducerClass(WordCountReducer.class);
		
//		【預設就一個reduce】如果預設,不需要設定,只有reduce個數!=1時設定,
//		設定2個reduce
		job.setNumReduceTasks(2);
		
		//設定任務mapper輸出的key的型別
		job.setMapOutputKeyClass(Text.class);
		//設定任務mapper輸出的value的型別
		job.setMapOutputValueClass(LongWritable.class);
		
		//設定最終輸出的key型別
		job.setOutputKeyClass(Text.class);
		//設定最終輸出的value型別
		job.setOutputValueClass(LongWritable.class);
		
//		設定輸入的格式:【預設是TextInputFormat.class】如果是文字,可以不寫;如果是其他的就必須設定此項
		job.setInputFormatClass(TextInputFormat.class); 
		
//		【預設是TextOutputFormat.class】如果是文字,可以不寫;如果是其他的就必須設定此項		
		job.setOutputFormatClass(TextOutputFormat.class);
		
		
		//設定任務的輸入目錄
		FileInputFormat.addInputPath(job, new Path(args[0]));
		
		//輸出目錄Path物件
		Path outputDir = new Path(args[1]);
		//設定任務的輸出目錄
		FileOutputFormat.setOutputPath(job, outputDir);
		
		//自動刪除輸出目錄
		FileSystem fs = FileSystem.get(conf);
		if(fs.exists(outputDir)){
			//遞迴刪目錄
			fs.delete(outputDir, true);
			System.out.println("output dir【" + outputDir.toString() + "】 is deleted");
		}
		
		//執行job任務, 阻塞的方法
		//boolean status = job.waitForCompletion(true);
		
		//執行job
		return (job.waitForCompletion(true)) ? 0 : 1;
		
	}
	
	public static void main(String[] args) throws Exception {
		//  /tmp/mr/input /tmp/mr/output
		System.exit(ToolRunner.run(new WordCount(), args));
	}

}