在WordCount基礎上改進,實現以詞頻為鍵值,並按詞頻降序排列
阿新 • • 發佈:2019-02-11
思路:
1、任務一:與WordCount.v1.0相同,但將處理結果以二進位制形式儲存到臨時目錄中,作為第二次MapReduce任務的輸入目錄
2、任務二:利用Hadoop提供的InverseMapper實現key與value位置互換,自定義一個IntWritableDecreasingComparator類,用於任務二的setSortComparatorClass( ),實現詞頻降序排列。
原始碼:
public class WordCount2 {
public static class TokenizerMapper extends Mapper<Object , Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
/**
* 為實現倒序排序而寫
*
*/
private static class IntWritableDecreasingComparator extends IntWritable.Comparator {
@Override
public int compare(WritableComparable a, WritableComparable b) {
// TODO Auto-generated method stub
return -super.compare(a, b);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
// TODO Auto-generated method stub
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
// validate the number of the args
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
// 定義一個臨時目錄
Path tempDir = new Path("wordcount-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount2.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, tempDir);
/*
* 先將詞頻統計任務的輸出結果寫到臨時目錄中,下一個排序任務以臨時目錄為輸入目錄,此目錄最後在HDFS中尚未出現
*
*/
job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
/*
* SequenceFileOutputFormat為常用的OutputFormat類之一,寫適合後續MapReduce任務讀取的二進位制檔案(
* 如果不進行setOutputFormatClass,那麼預設OutputFormat為TextOutputFormat,寫為文字行的形式)
*
*/
if (job.waitForCompletion(true))
// 此if語句表明,只有當job任務成功執行完成以後才開始sortJob,引數true表明列印verbose資訊
{
Job sortJob = Job.getInstance(conf, "sort");
/*
* 在sortJob中我們並不指定Reduce類,因為不需要,Hadoop會使用預設的IdentityReducer類,
* 將中間結果原樣輸出
*/
sortJob.setJarByClass(WordCount2.class);
FileInputFormat.addInputPath(sortJob, tempDir);
sortJob.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class);
sortJob.setMapperClass(org.apache.hadoop.mapreduce.lib.map.InverseMapper.class);
// InverseMapper由hadoop庫提供,作用是實現map()之後的資料對的key和value交換
sortJob.setNumReduceTasks(1);
// 將Reducer的個數限定為1,最終輸出的結果檔案就是一個
FileOutputFormat.setOutputPath(sortJob, new Path(otherArgs[1]));
sortJob.setOutputKeyClass(IntWritable.class);
sortJob.setOutputValueClass(Text.class);
sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);
/*
* Hadoop預設對IntWritable按升序排序,而我們需要的是按降序排列。
* 因此我們實現了一個IntWritableDecreasingCompatator類,並指定使用這個自定義的Comparator類,
* 對輸出結果中的key(詞頻)進行排序
*/
System.exit(sortJob.waitForCompletion(true) ? 0 : 1);
}
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}