元資料與資料治理|MapReduce統計詞語出現次數(第五篇)
晨曦同學(Dota界號稱利神)前段時間分享了這樣一個問題:如何在一個很大的檔案中(該檔案包含了中英文)找出出現頻率比較高的幾個詞呢?我們來分析一下。找出現頻率比較高的詞語,首先要有一個支援中文的分詞器(IK,庖丁解牛等等),這個問題不大;分詞之後呢就要統計詞語出現次數,類似於MapReduce程式中WordCount,這可是學習MapReduce的hello world程式呀,當然很容易搞定;最後還要來個排序,統計完了我們期望出現次數高的詞語出現在前面,MapReduce預設就支援排序,也沒問題。
解決這個問題需要兩個Job,一個是統計Job,一個是排序Job。
統計Job的Mapper需要做的事情就是分詞,這裡我們選用IKanalyzer分詞器,可能IK在官網上不好下載,我給大家準備好了,
public static class AnalyzerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { breakupSentence(value.toString(), context); } /** * 用分詞器將一段話拆分成多個詞。 * 分出一個詞就將數量置為1。 * * @param sentence * @param context * @throws IOException * @throws InterruptedException */ private void breakupSentence(String sentence, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { Analyzer analyzer = new IKAnalyzer(true); TokenStream tokenStream = analyzer.tokenStream("content", new StringReader(sentence)); tokenStream.addAttribute(CharTermAttribute.class); while (tokenStream.incrementToken()) { CharTermAttribute charTermAttribute = tokenStream .getAttribute(CharTermAttribute.class); word.set(charTermAttribute.toString()); context.write(word, one); } } }
別忘了給IK設定停止詞字典,過濾掉那些"了",”呢“,”啊“,”的“,"is", "and", "a" 之類的語氣詞、助詞、連詞、量詞等。
IKAnalyzer.cfg.xml
<properties> <comment>IK Analyzer 擴充套件配置</comment> <!--使用者可以在這裡配置自己的擴充套件字典 <entry key="ext_dict">ext.dic;</entry> --> <!--使用者可以在這裡配置自己的擴充套件停止詞字典--> <entry key="ext_stopwords">stopword.dic;chinese_stopword.dic</entry> </properties>
chinese_stopword.dic
的
呢
吧
和
......
統計Job的Reducer就是統計各個詞語的出現次數,跟WordCount程式中的完全一致,不再煩述。我們可以將該Reducer設定為Job的CombinerClass,這樣每次Mapper Task向Reducer Task傳遞資料時候,先執行Combiner,將結果先做個統計,減少了Mapper向Reducer的資料傳輸。
public static class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
接下來再看排序Job,MapReduce任務是通過key來排序的,我們需要將詞語出現的次數排序,所以需要先將統計Job的結果Key-Value互換,排序完成後,再換回來即可。
排序Job的Mapper將統計Job的結果Key-Value互換,程式碼如下:
public static class SortMapper extends Mapper<Object, Text, IntWritable, Text> {
private final static IntWritable wordCount = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(Object key, Text value,
Mapper<Object, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
StringTokenizer tokenizer = new StringTokenizer(value.toString());
while (tokenizer.hasMoreTokens()) {
String a = tokenizer.nextToken().trim();
word.set(a);
String b = tokenizer.nextToken().trim();
wordCount.set(Integer.valueOf(b));
context.write(wordCount, word);
}
}
}
排序Job的Reducer任務就是再將Key-Value倒置過來。
public static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable> {
private Text result = new Text();
@Override
protected void reduce(IntWritable key, Iterable<Text> values,
Reducer<IntWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
for (Text val : values) {
result.set(val.toString());
context.write(result, key);
}
}
}
Reducer預設排序是從小到大(數字),而我們期望出現次數多的詞語排在前面,所以需要重寫排序類WritableComparator。
public class DescWritableComparator extends WritableComparator {
protected DescWritableComparator() {
super(IntWritable.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
return -super.compare(a, b);
}
}
如果有多個Reducer任務,Reducer的預設排序只是對傳送到該Reducer下的資料區域性排序。如果想達到全域性排序,需要我們手動去寫partitioner。Partitioner的作用是根據不同的key,制定相應的規則分發到不同的Reducer中。
public static class SortPartitioner<K, V> extends Partitioner<K, V> {
@Override
public int getPartition(K key, V value, int numReduceTasks) {
int maxValue = 50;
int keySection = 0;
// 只有傳過來的key值大於maxValue 並且numReduceTasks比如大於1個才需要分割槽,否則直接返回0
if (numReduceTasks > 1 && key.hashCode() < maxValue) {
int sectionValue = maxValue / (numReduceTasks - 1);
int count = 0;
while ((key.hashCode() - sectionValue * count) > sectionValue) {
count++;
}
keySection = numReduceTasks - 1 - count;
}
return keySection;
}
}
最後就是連結MapReduce Job流,這裡有兩個Job,需要先執行統計Job,再執行排序Job。我們需要將統計Job的輸出作為排序Job的輸入。(友情提示:別忘了給統計Job設定Combiner哦,也別忘了給排序Job設定Comparator和Partitioner哦。)
Job job1 = new Job(configuration, "key word analyzer");
job1.setJarByClass(JobDefiner.class);
job1.setMapperClass(AnalyzerMapper.class);
job1.setCombinerClass(CountReducer.class);
job1.setReducerClass(CountReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job1, new Path(otherArgs[0]));
Path outPath1 = new Path(otherArgs[1]);
FileOutputFormat.setOutputPath(job1, outPath1);
job1.waitForCompletion(true);
Job job2 = new Job(configuration, "result sort");
job2.setJarByClass(JobDefiner.class);
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
job2.setMapperClass(SortKeyWordHandler.SortMapper.class);
job2.setReducerClass(SortKeyWordHandler.SortReducer.class);
// key按照降序排列
job2.setSortComparatorClass(DescWritableComparator.class);
job2.setPartitionerClass(SortKeyWordHandler.SortPartitioner.class);
FileInputFormat.addInputPath(job2, outPath1);
FileOutputFormat.setOutputPath(job2, new Path(otherArgs[2]));
job2.waitForCompletion(true);
大功告成?且慢!!在我的部落格Eclipse遠端除錯Hadoop叢集中,我們只講瞭如何配置本地Eclipse如何遠端除錯Hadoop叢集,在這裡我們就演示一下如何去跑。
我們先上傳兩篇關於習大大的報道到hdfs上
bin/hadoop dfs -mkdir input
bin/hadoop dfs -put mupeng/files/test_chinese* input
刷一下Eclipse裡面DFS Location就能看到
找到定義Job的main方法類,右鍵Run As=>Run Configurations ...
確認Project、Main class準確後,設定main方法的引數:統計Job的輸入路徑、統計Job的輸出路徑(同時也是排序Job的輸入路徑)、排序Job的輸出路徑。
hdfs://192.168.248.149:9000/user/mupeng/input
hdfs://192.168.248.149:9000/user/mupeng/output1
hdfs://192.168.248.149:9000/user/mupeng/output2
設定好後,點選Run,在第二個輸出路徑中,我們看到結果(我這隻有一個Reducer)
引用 20
強調 16
習近平 14
斐濟 13
我們 13
斐 12
中國 11
對 10
中 10
等 9
中方 9
為 9
方 9
......
最後提示大家:
本文相關原始碼下載地址(GitHub):點選檢視
相關部落格地址:Eclipse遠端除錯Hadoop叢集