Map Reduce用tree Map實現·topn
阿新 • • 發佈:2018-10-31
首先有如下如數,要統計每個頁面的訪問量,然後計算訪問量最大的五個頁面
2017/07/28 qq.com/a 2017/07/28 qq.com/bx 2017/07/28 qq.com/by 2017/07/28 qq.com/by3 2017/07/28 qq.com/news 2017/07/28 sina.com/news/socail 2017/07/28 163.com/ac 2017/07/28 sina.com/news/socail 2017/07/28 163.com/sport 2017/07/28 163.com/ac 2017/07/28 sina.com/play 2017/07/28 163.com/sport 2017/07/28 163.com/ac 2017/07/28 sina.com/movie 2017/07/28 sina.com/play 2017/07/28 sina.com/movie 2017/07/28 163.com/sport 2017/07/28 sina.com/movie 2017/07/28 163.com/ac 2017/07/28 163.com/ac 2017/07/28 163.com/acc 2017/07/28 qq.com/by 2017/07/28 qq.com/by3 2017/07/28 qq.com/news 2017/07/28 163.com/sport
1 Map階段
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class PageTopnMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { //切分資料 String line = value.toString(); String[] split = line.split(" "); //把資料和1組合 context.write(new Text(split[1]), new IntWritable(1)); } }
2 寫一個javaBean用於封裝頁面和頁面的訪問量,並實現compareable介面,重寫compareTo方法(主要是為了規定將這個javaBean放入TreeMap時按什麼排序)
public class PageCount implements Comparable<PageCount>{ private String page; private int count; public void set(String page, int count) { this.page = page; this.count = count; } public String getPage() { return page; } public void setPage(String page) { this.page = page; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } @Override public int compareTo(PageCount o) { return o.getCount()-this.count==0?this.page.compareTo(o.getPage()):o.getCount()-this.count; } }
3 reduce階段
import java.io.IOException;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class PageTopnReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
TreeMap<PageCount, Object> treeMap = new TreeMap<>();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}
//把統計好的頁面訪問量寫入到JavaBean
PageCount pageCount = new PageCount();
pageCount.set(key.toString(), count);
treeMap.put(pageCount,null);
}
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
int topn = conf.getInt("top.n", 5);
Set<Entry<PageCount, Object>> entrySet = treeMap.entrySet();
int i= 0;
for (Entry<PageCount, Object> entry : entrySet) {
context.write(new Text(entry.getKey().getPage()), new IntWritable(entry.getKey().getCount()));
i++;
if(i==topn) return;
}
}
}