分組Top N問題(二)
前言:
在Hadoop中,排序是MapReduce的靈魂,MapTask和ReduceTask均會對資料按Key排序,這個操作是MR框架的預設行為,不管你的業務邏輯上是否需要這一操作。
技術點:
MapReduce框架中,用到的排序主要有兩種:快速排序和基於堆實現的優先順序佇列(PriorityQueue)。
Mapper階段:
從map輸出到環形緩衝區的資料會被排序(這是MR框架中改良的快速排序),這個排序涉及partition和key,當緩衝區容量佔用80%,會spill資料到磁碟,生成IFile檔案,Map結束後,會將IFile檔案排序合併成一個大檔案(基於堆實現的優先順序佇列),以供不同的reduce來拉取相應的資料。
Reducer階段:
從Mapper端取回的資料已是部分有序,Reduce Task只需進行一次歸併排序即可保證資料整體有序。為了提高效率,Hadoop將sort階段和reduce階段並行化,在sort階段,Reduce Task為記憶體和磁碟中的檔案建立了小頂堆,儲存了指向該小頂堆根節點的迭代器,並不斷的移動迭代器,以將key相同的資料順次交給reduce()函式處理,期間移動迭代器的過程實際上就是不斷調整小頂堆的過程(建堆→取堆頂元素→重新建堆→取堆頂元素...),這樣,sort和reduce可以並行進行。
分組Top n分析:
在資料處理中,經常會碰到這樣一個場景,對錶資料按照某一欄位分組,然後找出各自組內最大的幾條記錄情形。針對這種分組Top N問題,我們利用Hive、MapReduce等多種工具實現一下。
場景模擬:
對類如下users表記錄,取出不同grade下得分最多的兩條記錄
id grade score
1 A 10
2 A 40
3 B 30
4 C 20
5 B 10
6 D 40
7 A 30
8 C 20
9 B 10
10 D 40
11 C 30
12 D 20
最簡單的辦法是:
1、在maper階段以grade為key,score為value,輸出進入下一階段2、經過shuffle之後,相同grade的資料會發送給同一個reducer
3、然後,我們就可以在reducer中遍歷某個grade的一組values,
4、這一組values對於score來說是無序的,進而需要在reducer中快取這一組values,然後排序從而取到這一組values中的Top n記錄。
Reduce端TreeSet方法進階:
需要說明的是,求Top n,更簡單的方法可以直接用內建的TreeMap或者TreeSet,這兩者是基於紅黑樹的一種資料結構,內部維持key的次序,但每次新增新元素,其排序的開銷要大於堆調整的開銷。例如要找最大的10個元素,那麼建立的是小頂堆。小頂堆的特性是根節點是最小元素。不需要對堆進行再排序,當堆的根節點被替換成新的元素時,需要進行堆化,以保持小頂堆的特性。
案例實現步驟:
以TreeSet方法為例,在maptask階段以grade為key,score為value,分發給reducetask,然後在reducetask階段定義一個TreeSet<Long>集合,存放同個grade的得分,並且只儲存n個,集合大於n時,將最小值的remove出去。
程式碼實現:
import java.io.IOException;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
* 這個MR將會取得每組grade中score最大的前3個
* @author 張恩備
* @date 2016-11-25 下午03:52:53
*/
public class GroupTopK {
public static class GroupTopKMapper extends
Mapper<LongWritable, Text, IntWritable, LongWritable> {
LongWritable outKey = new LongWritable();
LongWritable outValue = new LongWritable();
String[] valArr = null;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
valArr = value.toString().split("\t");
outKey.set(Long.parseInt(valArr[1]));// grade Long
outValue.set(Long.parseLong(valArr[2]));// score long
context.write(outKey, outValue);
}
}
public static class GroupTopKReducer extends
Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {
LongWritable outValue = new LongWritable();
public void reduce(LongWritable key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
TreeSet<Long> scoreTreeSet = new TreeSet<Long>();
for (LongWritable val : values) {
scoreTreeSet.add(val.get());
if (scoreTreeSet.size() > 3) {
scoreTreeSet.remove(scoreTreeSet.first());
}
}
for (Long score : scoreTreeSet) {
outValue.set(score);
context.write(key, outValue);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
System.out.println(otherArgs.length);
System.out.println(otherArgs[0]);
System.out.println(otherArgs[1]);
if (otherArgs.length != 3) {
System.err.println("Usage: GroupTopK <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "GroupTopK");
job.setJarByClass(GroupTopK.class);
job.setMapperClass(GroupTopKMapper.class);
job.setReducerClass(GroupTopKReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
groupingcomparator求最大最小值:
當我們使用第一種方法實現最大最小值時會發現,這個辦法固然可行,但是效率不是很高,因為在reducer中針對一組values取最大最小值,需要在記憶體中進行快取並排序,在資料量大的情況下,會耗費相當多的記憶體空間和cpu運算資源,甚至可能會記憶體溢位。
其實,我們利用自定義groupingcomparator方法來讓reduce階段完成取最值。
groupingcomparator實現步驟:
1、首先定義出包含grade和score的objbean,然後以“grade和score”作為key,可以將map階段讀取到的所有成績資料按照grade分割槽,按照score排序,傳送到reduce,為保證能相同grade落到同一個reduce,還需要修改分割槽規則partition。
2、在reduce端利用groupingcomparator將grade相同的kv聚合成組,然後取第一個即是最大資料
程式碼實現:
定義成績資訊bean
/**
* 成績資訊bean,實現hadoop的序列化機制
* @author 張恩備
*
*/
public class ScoreBean implements WritableComparable<ScoreBean>{
private Text grade;
private DoubleWritable score;
public ScoreBean() {
}
public ScoreBean(Text grade, DoubleWritable score) {
set(grade, score);
}
public void set(Text grade, DoubleWritable score) {
this.grade = grade;
this.score = score;
}
public Text getGrade() {
return grade;
}
public DoubleWritable getScore() {
return score;
}
@Override
public int compareTo(ScoreBean o) {
int cmp = this.grade.compareTo(o.getGrade());
if (cmp == 0) {
cmp = -this.score.compareTo(o.getScore());
}
return cmp;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(grade.toString());
out.writeDouble(score.get());
}
@Override
public void readFields(DataInput in) throws IOException {
String readUTF = in.readUTF();
double readDouble = in.readDouble();
this.grade = new Text(readUTF);
this.score= new DoubleWritable(readDouble);
}
@Override
public String toString() {
return grade.toString() + "\t" + score.get();
}
}
自定義partation分片
public class GradePartitioner extends Partitioner<ScoreBean, NullWritable>{
@Override
public int getPartition(ScoreBean bean, NullWritable value, int numReduceTasks) {
//相同grade的成績bean,會發往相同的partition
//而且,產生的分割槽數,是會跟使用者設定的reduce task數保持一致
return (bean.getGrade().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
自定義groupingcomparator
/**
* 用於控制shuffle過程中reduce端對kv對的聚合邏輯
* @author 張恩備
*
*/
public class GradeGroupingComparator extends WritableComparator {
protected GradeGroupingComparator() {
super(ScoreBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
ScoreBean abean = (ScoreBean) a;
ScoreBean bbean = (ScoreBean) b;
//將grade相同的bean都視為相同,從而聚合為一組
return abean.getGrade().compareTo(bbean.getGrade());
}
}
編寫mapreduce處理流程/**
* 利用secondarysort機制輸出每個等級下成績最大的記錄
* @author 張恩備
*/
public class SecondarySort {
static class SecondarySortMapper extends Mapper<LongWritable, Text, ScoreBean, NullWritable>{
ScoreBean bean = new ScoreBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));
context.write(bean, NullWritable.get());
}
}
static class SecondarySortReducer extends Reducer<ScoreBean, NullWritable, ScoreBean, NullWritable>{
//在設定了groupingcomparator以後,這裡收到的kv資料 就是: <1001 87.6>,null <1001 76.5>,null ....
//此時,reduce方法中的引數key就是上述kv組中的第一個kv的key:<1001 87.6>
//要輸出同一個grade的所有成績中最大金額的那一個,就只要輸出這個key
@Override
protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setOutputKeyClass(ScoreBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定shuffle所使用的GroupingComparator類
job.setGroupingComparatorClass(GradeGroupingComparator.class);
//指定shuffle所使用的partitioner類
job.setPartitionerClass(GradePartitioner.class);
job.setNumReduceTasks(3);
job.waitForCompletion(true);
}
}
回顧案例,需要自定義這幾個元素:
- 自定義的複合key(保證到達Reduce前完成grade、score排序)
- 自定義的partitioner(保證相同grade能落在同一個reduce端)
- 自定義的GroupingComparator(保證reduce聚合操作時只按照grade對bean聚合交給reduce排序)
瞭解更多推薦:
https://my.oschina.net/leejun2005/blog/135085 ,作者寫的很多文章都有借鑑,再次感謝