hadoop 中文詞頻排序 top-k 問題
本人最近一直在hadoop領域,摸爬滾打,由於最近老是佈置了一項作業:讓統計一個檔案中出現次數最高的單詞。一看到題目我就想用hadoop來實現這個問題,由於有現成的wordcount框架,所以就在這之上進行程式的修改新增即可。
準備過程:
1、我去下載了金庸的小說全集,順便分析分析,看他老人家筆下,誰的戲份更重。
2、由於是中文分詞,所以必須要有一箇中文分詞器,找到了一個java版的apache開源分詞器。效能還行,但是在一些詞語拆分上還是有些不夠智慧。它的演算法實現是用到了RMM演算法,在次不多介紹。
編碼設計:我用的是mapreduce老框架,並且程式實現需要連線多個mapreduce任務,所以就在此介紹mapreduce的設計思路吧
1、在wordcount的基礎上,第一個map實現分詞,並篩選出key/value鍵值對(如:張無忌 ---- 1)並輸出。map的分詞過程,會按照行進行劃分。
2、第一個個reduce接收第一個map的鍵值對,然後進行累加整合。reduce接收的是key/<value--list>同一個key的所有值會被整合成一個列表,作為reduce的輸入如(張無忌:1,1,1,1,1)。最終reduce加工後的輸出是key/value,如(張無忌 5)。
3、第二個map,此處是呼叫了hadoop的api,Inversemaper.class 其實現功能是將key和value位置進行互換。這樣做的原因是,hadoop預設對key進行升序排序,由於我們要統計的是最多的單詞,所以需要的數量進行排序,數量在第一個reduce的輸出中,為value,所以該步需要對key和value進行互換。
4、然後對key進行降序排序
5、最後一個map,作為計數器,顯示出詞頻出現最高的k個詞語。
這個程式差不多就介紹完了,這是我做的統計我們
4380張無忌
4623只見
4643兩人
4873黃蓉
4994不知
5055楊過
5285心想
5352令狐沖
5462聽得
5501師父
5569他們
5905心中
6173郭靖
6309武功
6487一聲
6697咱們
7336甚麼
7359笑道
7779一個
9828韋小寶
9872自己
13157說道
哈哈,發現,有9個老婆的韋小寶,果然是金庸老先生,濃墨重彩的
package chineseword; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import java.io.ByteArrayInputStream; import java.io.StringReader; import java.util.Random; import java.util.StringTokenizer; import org.wltea.analyzer.core.IKSegmenter; import org.wltea.analyzer.core.Lexeme; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.map.InverseMapper; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class cword { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { byte[] bt = value.toString().getBytes();//**************************8 InputStream ip = new ByteArrayInputStream(bt); Reader read = new InputStreamReader(ip); // StringReader read=new StringReader(value.toString()); IKSegmenter iks = new IKSegmenter(read,true); Lexeme t=null; while ((t = iks.next()) != null) { word.set(t.getLexemeText()); // System.out.print(word+" "); context.write(word, one); } //System.out.println(); } } public static class TokenizerMapper2 extends Mapper<Object, Text,IntWritable,Text>{ int c=0; public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); IntWritable a=new IntWritable(Integer.parseInt(itr.nextToken())); Text b=new Text(itr.nextToken()); if((c<100)&b.getLength()>5){ //System.out.println("sss"); context.write(a, b); //System.out.println(" "+b.getLength()); c++; } } } private static class IntWritableDecreasingComparator extends IntWritable.Comparator { public int compare(WritableComparable a, WritableComparable b) { //System.out.println("ss"); return -super.compare(a, b); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { //System.out.println("ss1"); return -super.compare(b1, s1, l1, b2, s2, l2); } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Path tempDir = new Path("wordcount-temp-" + Integer.toString( new Random().nextInt(Integer.MAX_VALUE))); //定義一個臨時目錄 Path tempDir2 = new Path("wordcount-temp2-" + Integer.toString( new Random().nextInt(Integer.MAX_VALUE))); //定義一個臨時目錄 Job job = new Job(conf, "word count"); job.setJarByClass(cword.class); try{ job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, tempDir);//先將詞頻統計任務的輸出結果寫到臨時目 //錄中, 下一個排序任務以臨時目錄為輸入目錄。 job.setOutputFormatClass(SequenceFileOutputFormat.class); if(job.waitForCompletion(true)) { Job sortJob = new Job(conf, "sort"); sortJob.setJarByClass(cword.class); FileInputFormat.addInputPath(sortJob, tempDir); sortJob.setInputFormatClass(SequenceFileInputFormat.class); /*InverseMapper由hadoop庫提供,作用是實現map()之後的資料對的key和value交換*/ sortJob.setMapperClass(InverseMapper.class); /*將 Reducer 的個數限定為1, 最終輸出的結果檔案就是一個。*/ sortJob.setNumReduceTasks(1); FileOutputFormat.setOutputPath(sortJob, tempDir2); sortJob.setOutputKeyClass(IntWritable.class); sortJob.setOutputValueClass(Text.class); /*Hadoop 預設對 IntWritable 按升序排序,而我們需要的是按降序排列。 * 因此我們實現了一個 IntWritableDecreasingComparator 類, * 並指定使用這個自定義的 Comparator 類對輸出結果中的 key (詞頻)進行排序*/ sortJob.setSortComparatorClass(IntWritableDecreasingComparator.class); if(sortJob.waitForCompletion(true)) { Job topJob = new Job(conf, "sort"); topJob.setJarByClass(cword.class); FileInputFormat.addInputPath(topJob, tempDir2); //topJob.setInputFormatClass(SequenceFileInputFormat.class); /*InverseMapper由hadoop庫提供,作用是實現map()之後的資料對的key和value交換*/ topJob.setMapperClass(TokenizerMapper2.class); /*將 Reducer 的個數限定為1, 最終輸出的結果檔案就是一個。*/ topJob.setNumReduceTasks(1); FileOutputFormat.setOutputPath(topJob, new Path(otherArgs[1])); topJob.setOutputKeyClass(IntWritable.class); topJob.setOutputValueClass(Text.class); System.exit(topJob.waitForCompletion(true) ? 0 : 1); } // System.exit(sortJob.waitForCompletion(true) ? 0 : 1); } }finally{ FileSystem.get(conf).deleteOnExit(tempDir); } } }