1. 程式人生 > >在WordCount基礎上改進,實現以詞頻為鍵值,並按詞頻降序排列

在WordCount基礎上改進,實現以詞頻為鍵值,並按詞頻降序排列

思路:

1、任務一:與WordCount.v1.0相同,但將處理結果以二進位制形式儲存到臨時目錄中,作為第二次MapReduce任務的輸入目錄
2、任務二:利用Hadoop提供的InverseMapper實現key與value位置互換,自定義一個IntWritableDecreasingComparator類,用於任務二的setSortComparatorClass( ),實現詞頻降序排列。

原始碼:


public class WordCount2 {

    public static class TokenizerMapper extends Mapper<Object
, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while
(itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws
IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } /** * 為實現倒序排序而寫 * */ private static class IntWritableDecreasingComparator extends IntWritable.Comparator { @Override public int compare(WritableComparable a, WritableComparable b) { // TODO Auto-generated method stub return -super.compare(a, b); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { // TODO Auto-generated method stub return -super.compare(b1, s1, l1, b2, s2, l2); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); // validate the number of the args if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } // 定義一個臨時目錄 Path tempDir = new Path("wordcount-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount2.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, tempDir); /* * 先將詞頻統計任務的輸出結果寫到臨時目錄中,下一個排序任務以臨時目錄為輸入目錄,此目錄最後在HDFS中尚未出現 * */ job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class); /* * SequenceFileOutputFormat為常用的OutputFormat類之一,寫適合後續MapReduce任務讀取的二進位制檔案( * 如果不進行setOutputFormatClass,那麼預設OutputFormat為TextOutputFormat,寫為文字行的形式) * */ if (job.waitForCompletion(true)) // 此if語句表明,只有當job任務成功執行完成以後才開始sortJob,引數true表明列印verbose資訊 { Job sortJob = Job.getInstance(conf, "sort"); /* * 在sortJob中我們並不指定Reduce類,因為不需要,Hadoop會使用預設的IdentityReducer類, * 將中間結果原樣輸出 */ sortJob.setJarByClass(WordCount2.class); FileInputFormat.addInputPath(sortJob, tempDir); sortJob.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class); sortJob.setMapperClass(org.apache.hadoop.mapreduce.lib.map.InverseMapper.class); // InverseMapper由hadoop庫提供,作用是實現map()之後的資料對的key和value交換 sortJob.setNumReduceTasks(1); // 將Reducer的個數限定為1,最終輸出的結果檔案就是一個 FileOutputFormat.setOutputPath(sortJob, new Path(otherArgs[1])); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(Text.class); sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class); /* * Hadoop預設對IntWritable按升序排序,而我們需要的是按降序排列。 * 因此我們實現了一個IntWritableDecreasingCompatator類,並指定使用這個自定義的Comparator類, * 對輸出結果中的key(詞頻)進行排序 */ System.exit(sortJob.waitForCompletion(true) ? 0 : 1); } System.exit(job.waitForCompletion(true) ? 0 : 1); } }