1. 程式人生 > >hadoop 中文詞頻排序 top-k 問題

hadoop 中文詞頻排序 top-k 問題

    本人最近一直在hadoop領域,摸爬滾打,由於最近老是佈置了一項作業:讓統計一個檔案中出現次數最高的單詞。一看到題目我就想用hadoop來實現這個問題,由於有現成的wordcount框架,所以就在這之上進行程式的修改新增即可。

    準備過程:

    1、我去下載了金庸的小說全集,順便分析分析,看他老人家筆下,誰的戲份更重。

    2、由於是中文分詞,所以必須要有一箇中文分詞器,找到了一個java版的apache開源分詞器。效能還行,但是在一些詞語拆分上還是有些不夠智慧。它的演算法實現是用到了RMM演算法,在次不多介紹。

    編碼設計:我用的是mapreduce老框架,並且程式實現需要連線多個mapreduce任務,所以就在此介紹mapreduce的設計思路吧

    1、在wordcount的基礎上,第一個map實現分詞,並篩選出key/value鍵值對(如:張無忌  ----  1)並輸出。map的分詞過程,會按照行進行劃分。

    2、第一個個reduce接收第一個map的鍵值對,然後進行累加整合。reduce接收的是key/<value--list>同一個key的所有值會被整合成一個列表,作為reduce的輸入如(張無忌:1,1,1,1,1)。最終reduce加工後的輸出是key/value,如(張無忌  5)。

    3、第二個map,此處是呼叫了hadoop的api,Inversemaper.class 其實現功能是將key和value位置進行互換。這樣做的原因是,hadoop預設對key進行升序排序,由於我們要統計的是最多的單詞,所以需要的數量進行排序,數量在第一個reduce的輸出中,為value,所以該步需要對key和value進行互換。   

    4、然後對key進行降序排序

    5、最後一個map,作為計數器,顯示出詞頻出現最高的k個詞語。

    這個程式差不多就介紹完了,這是我做的統計我們
4380張無忌
4623只見
4643兩人
4873黃蓉
4994不知
5055楊過
5285心想
5352令狐沖
5462聽得
5501師父
5569他們
5905心中
6173郭靖
6309武功
6487一聲
6697咱們
7336甚麼
7359笑道
7779一個
9828韋小寶
9872自己
13157說道

    哈哈,發現,有9個老婆的韋小寶,果然是金庸老先生,濃墨重彩的

package chineseword;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.io.ByteArrayInputStream;
import java.io.StringReader;
import java.util.Random;
import java.util.StringTokenizer;

import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class cword {
    
      public static class TokenizerMapper 
           extends Mapper<Object, Text, Text, IntWritable>{
        
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
            	byte[] bt = value.toString().getBytes();//**************************8
                InputStream ip = new ByteArrayInputStream(bt);
                Reader read = new InputStreamReader(ip);
//        	StringReader read=new StringReader(value.toString());
                IKSegmenter iks = new IKSegmenter(read,true);
                Lexeme t=null;
                while ((t = iks.next()) != null)
                {
                    word.set(t.getLexemeText());
                   // System.out.print(word+" ");
                    context.write(word, one);
                }
                
                //System.out.println();
        }
      }
      
      public static class TokenizerMapper2 
      extends Mapper<Object, Text,IntWritable,Text>{
  	  int c=0;
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(value.toString());
          IntWritable a=new IntWritable(Integer.parseInt(itr.nextToken()));
          Text b=new Text(itr.nextToken());
          
          if((c<100)&b.getLength()>5){
   	       //System.out.println("sss");
            context.write(a, b);
            //System.out.println(" "+b.getLength());
            c++;
          }
        }
       }
      
      private static class IntWritableDecreasingComparator extends IntWritable.Comparator {  
           public int compare(WritableComparable a, WritableComparable b) { 
         	  //System.out.println("ss");
             return -super.compare(a, b);  
           }  
             
           public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
         	  //System.out.println("ss1");
               return -super.compare(b1, s1, l1, b2, s2, l2);  
           }  
       }  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {  
      Configuration conf = new Configuration();  
      String[] otherArgs = new GenericOptionsParser(conf, args)  
              .getRemainingArgs();  
      if (otherArgs.length != 2) {  
          System.err.println("Usage: wordcount <in> <out>");  
          System.exit(2);  
      }  
       Path tempDir = new Path("wordcount-temp-" + Integer.toString(  
                  new Random().nextInt(Integer.MAX_VALUE))); //定義一個臨時目錄  
       Path tempDir2 = new Path("wordcount-temp2-" + Integer.toString(  
               new Random().nextInt(Integer.MAX_VALUE))); //定義一個臨時目錄  
        
      Job job = new Job(conf, "word count");  
      job.setJarByClass(cword.class);  
      try{  
          job.setMapperClass(TokenizerMapper.class);  
          job.setCombinerClass(IntSumReducer.class);  
          job.setReducerClass(IntSumReducer.class);  
            
          job.setOutputKeyClass(Text.class);  
          job.setOutputValueClass(IntWritable.class);  
            
          FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
          FileOutputFormat.setOutputPath(job, tempDir);//先將詞頻統計任務的輸出結果寫到臨時目  
                                                       //錄中, 下一個排序任務以臨時目錄為輸入目錄。  
          job.setOutputFormatClass(SequenceFileOutputFormat.class);  
          if(job.waitForCompletion(true))  
          {  
              Job sortJob = new Job(conf, "sort");  
              sortJob.setJarByClass(cword.class);  
                
              FileInputFormat.addInputPath(sortJob, tempDir);  
              sortJob.setInputFormatClass(SequenceFileInputFormat.class);  
                
              /*InverseMapper由hadoop庫提供,作用是實現map()之後的資料對的key和value交換*/  
              sortJob.setMapperClass(InverseMapper.class);  
              /*將 Reducer 的個數限定為1, 最終輸出的結果檔案就是一個。*/  
              sortJob.setNumReduceTasks(1);   
              FileOutputFormat.setOutputPath(sortJob, tempDir2);  
                
              sortJob.setOutputKeyClass(IntWritable.class);  
              sortJob.setOutputValueClass(Text.class);  
              /*Hadoop 預設對 IntWritable 按升序排序,而我們需要的是按降序排列。 
               * 因此我們實現了一個 IntWritableDecreasingComparator 類,  
               * 並指定使用這個自定義的 Comparator 類對輸出結果中的 key (詞頻)進行排序*/  
              sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class);  
              if(sortJob.waitForCompletion(true))
              {
              	Job topJob = new Job(conf, "sort");  
              	topJob.setJarByClass(cword.class);  
                    
                  FileInputFormat.addInputPath(topJob, tempDir2);  
                  //topJob.setInputFormatClass(SequenceFileInputFormat.class);  
                    
                  /*InverseMapper由hadoop庫提供,作用是實現map()之後的資料對的key和value交換*/  
                  topJob.setMapperClass(TokenizerMapper2.class);  
                  /*將 Reducer 的個數限定為1, 最終輸出的結果檔案就是一個。*/  
                  topJob.setNumReduceTasks(1);   
                  FileOutputFormat.setOutputPath(topJob, new Path(otherArgs[1]));  
                    
                  topJob.setOutputKeyClass(IntWritable.class);  
                  topJob.setOutputValueClass(Text.class);  
                  System.exit(topJob.waitForCompletion(true) ? 0 : 1);  
              }
              
     
             // System.exit(sortJob.waitForCompletion(true) ? 0 : 1);  
          }  
      }finally{  
          FileSystem.get(conf).deleteOnExit(tempDir);  
      }  
  }  
}