1. 程式人生 > >Mapreduce例項---統計單詞個數(wordcount)

Mapreduce例項---統計單詞個數(wordcount)

一:問題介紹

統計每一個單詞在整個資料集中出現的總次數。

資料流程:


二:需要的jar包

Hadoop-2.4.1\share\hadoop\hdfs\hadoop-hdfs-2.4.1.jar
hadoop-2.4.1\share\hadoop\hdfs\lib\所有jar包

hadoop-2.4.1\share\hadoop\common\hadoop-common-2.4.1.jar
hadoop-2.4.1\share\hadoop\common\lib\所有jar包

hadoop-2.4.1\share\hadoop\mapreduce\除hadoop-mapreduce-examples-2.4.1.jar之外的jar包
hadoop-2.4.1\share\hadoop\mapreduce\lib\所有jar包

三:程式碼

mapper類實現:

/*
 * KEYIN:輸入kv資料對中key的資料型別
 * VALUEIN:輸入kv資料對中value的資料型別
 * KEYOUT:輸出kv資料對中key的資料型別
 * VALUEOUT:輸出kv資料對中value的資料型別
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	
	/*
	 * map方法是提供給map task程序來呼叫的,map task程序是每讀取一行文字來呼叫一次我們自定義的map方法
	 * map task在呼叫map方法時,傳遞的引數:
	 * 		一行的起始偏移量LongWritable作為key
	 * 		一行的文字內容Text作為value
	 */
	@Override
	protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
		//拿到一行文字內容,轉換成String 型別
		String line = value.toString();
		//將這行文字切分成單詞
		String[] words=line.split(" ");
		
		//輸出<單詞,1>
		for(String word:words){
			context.write(new Text(word), new IntWritable(1));
		}
	}
}

reducer類實現:

/*
 * KEYIN:對應mapper階段輸出的key型別
 * VALUEIN:對應mapper階段輸出的value型別
 * KEYOUT:reduce處理完之後輸出的結果kv對中key的型別
 * VALUEOUT:reduce處理完之後輸出的結果kv對中value的型別
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	/*
	 * reduce方法提供給reduce task程序來呼叫
	 * 
	 * reduce task會將shuffle階段分發過來的大量kv資料對進行聚合,聚合的機制是相同key的kv對聚合為一組
	 * 然後reduce task對每一組聚合kv呼叫一次我們自定義的reduce方法
	 * 比如:<hello,1><hello,1><hello,1><tom,1><tom,1><tom,1>
	 *  hello組會呼叫一次reduce方法進行處理,tom組也會呼叫一次reduce方法進行處理
	 *  呼叫時傳遞的引數:
	 *  		key:一組kv中的key
	 *  		values:一組kv中所有value的迭代器
	 */
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
		//定義一個計數器
		int count = 0;
		//通過value這個迭代器,遍歷這一組kv中所有的value,進行累加
		for(IntWritable value:values){
			count+=value.get();
		}
		
		//輸出這個單詞的統計結果
		context.write(key, new IntWritable(count));
	}
}

job提交客戶端實現:

public class WordCountJobSubmitter {
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job wordCountJob = Job.getInstance(conf);
		
		//重要:指定本job所在的jar包
		wordCountJob.setJarByClass(WordCountJobSubmitter.class);
		
		//設定wordCountJob所用的mapper邏輯類為哪個類
		wordCountJob.setMapperClass(WordCountMapper.class);
		//設定wordCountJob所用的reducer邏輯類為哪個類
		wordCountJob.setReducerClass(WordCountReducer.class);
		
		//設定map階段輸出的kv資料型別
		wordCountJob.setMapOutputKeyClass(Text.class);
		wordCountJob.setMapOutputValueClass(IntWritable.class);
		
		//設定最終輸出的kv資料型別
		wordCountJob.setOutputKeyClass(Text.class);
		wordCountJob.setOutputValueClass(IntWritable.class);
		
		//設定要處理的文字資料所存放的路徑
		FileInputFormat.setInputPaths(wordCountJob, "hdfs://192.168.77.70:9000/wordcount/srcdata/");
		FileOutputFormat.setOutputPath(wordCountJob, new Path("hdfs://192.168.77.70:9000/wordcount/output/"));
		
		//提交job給hadoop叢集
		wordCountJob.waitForCompletion(true);
	}
}

四:操作流程

1、將專案打成jar包上傳到虛擬機器上

2、建立文字資料

3、將文字資料上傳到hdfs


4、執行jar檔案


5、結果