1. 程式人生 > >分組Top N問題(二)

分組Top N問題(二)

前言:

在Hadoop中,排序是MapReduce的靈魂,MapTask和ReduceTask均會對資料按Key排序,這個操作是MR框架的預設行為,不管你的業務邏輯上是否需要這一操作。

技術點:

MapReduce框架中,用到的排序主要有兩種:快速排序和基於堆實現的優先順序佇列(PriorityQueue)。

Mapper階段: 

從map輸出到環形緩衝區的資料會被排序(這是MR框架中改良的快速排序),這個排序涉及partition和key,當緩衝區容量佔用80%,會spill資料到磁碟,生成IFile檔案,Map結束後,會將IFile檔案排序合併成一個大檔案(基於堆實現的優先順序佇列),以供不同的reduce來拉取相應的資料。

Reducer階段: 

從Mapper端取回的資料已是部分有序,Reduce Task只需進行一次歸併排序即可保證資料整體有序。為了提高效率,Hadoop將sort階段和reduce階段並行化,在sort階段,Reduce Task為記憶體和磁碟中的檔案建立了小頂堆,儲存了指向該小頂堆根節點的迭代器,並不斷的移動迭代器,以將key相同的資料順次交給reduce()函式處理,期間移動迭代器的過程實際上就是不斷調整小頂堆的過程(建堆→取堆頂元素→重新建堆→取堆頂元素...),這樣,sort和reduce可以並行進行。

分組Top n分析:

在資料處理中,經常會碰到這樣一個場景,對錶資料按照某一欄位分組,然後找出各自組內最大的幾條記錄情形。針對這種分組Top N問題,我們利用Hive、MapReduce等多種工具實現一下。

場景模擬:

對類如下users表記錄,取出不同grade下得分最多的兩條記錄

id	grade	score
1	  A    	10
2	  A   	40
3	  B   	30
4	  C   	20
5	  B   	10
6	  D   	40
7	  A   	30
8	  C   	20
9	  B   	10
10	  D   	40
11	  C   	30
12	  D   	20

最簡單的辦法是:

1、在maper階段以grade為key,score為value,輸出進入下一階段
2、經過shuffle之後,相同grade的資料會發送給同一個reducer
3、然後,我們就可以在reducer中遍歷某個grade的一組values,
4、這一組values對於score來說是無序的,進而需要在reducer中快取這一組values,然後排序從而取到這一組values中的Top n記錄。

Reduce端TreeSet方法進階:

需要說明的是,求Top n,更簡單的方法可以直接用內建的TreeMap或者TreeSet,這兩者是基於紅黑樹的一種資料結構,內部維持key的次序,但每次新增新元素,其排序的開銷要大於堆調整的開銷。例如要找最大的10個元素,那麼建立的是小頂堆。小頂堆的特性是根節點是最小元素。不需要對堆進行再排序,當堆的根節點被替換成新的元素時,需要進行堆化,以保持小頂堆的特性。

案例實現步驟:

以TreeSet方法為例,在maptask階段以grade為key,score為value,分發給reducetask,然後在reducetask階段定義一個TreeSet<Long>集合,存放同個grade的得分,並且只儲存n個,集合大於n時,將最小值的remove出去。

程式碼實現:

import java.io.IOException;
import java.util.TreeSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/**
 * 這個MR將會取得每組grade中score最大的前3個
 * @author 張恩備
 * @date 2016-11-25 下午03:52:53
 */
public class GroupTopK {
	public static class GroupTopKMapper extends
			Mapper<LongWritable, Text, IntWritable, LongWritable> {
		LongWritable outKey = new LongWritable();
		LongWritable outValue = new LongWritable();
		String[] valArr = null;

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			valArr = value.toString().split("\t");
			outKey.set(Long.parseInt(valArr[1]));// grade Long
			outValue.set(Long.parseLong(valArr[2]));// score long
			context.write(outKey, outValue);
		}
	}

	public static class GroupTopKReducer extends
			Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {

		LongWritable outValue = new LongWritable();

		public void reduce(LongWritable key, Iterable<LongWritable> values,
				Context context) throws IOException, InterruptedException {
			TreeSet<Long> scoreTreeSet = new TreeSet<Long>();
			for (LongWritable val : values) {
				scoreTreeSet.add(val.get());
				if (scoreTreeSet.size() > 3) {
					scoreTreeSet.remove(scoreTreeSet.first());
				}
			}
			for (Long score : scoreTreeSet) {
				outValue.set(score);
				context.write(key, outValue);
			}
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();

		System.out.println(otherArgs.length);
		System.out.println(otherArgs[0]);
		System.out.println(otherArgs[1]);

		if (otherArgs.length != 3) {
			System.err.println("Usage: GroupTopK <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "GroupTopK");
		job.setJarByClass(GroupTopK.class);
		job.setMapperClass(GroupTopKMapper.class);
		job.setReducerClass(GroupTopKReducer.class);
		job.setNumReduceTasks(1);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

groupingcomparator求最大最小值:

當我們使用第一種方法實現最大最小值時會發現,這個辦法固然可行,但是效率不是很高,因為在reducer中針對一組values取最大最小值,需要在記憶體中進行快取並排序,在資料量大的情況下,會耗費相當多的記憶體空間和cpu運算資源,甚至可能會記憶體溢位。

其實,我們利用自定義groupingcomparator方法來讓reduce階段完成取最值。

groupingcomparator實現步驟:

1、首先定義出包含grade和score的objbean,然後以“grade和score”作為key,可以將map階段讀取到的所有成績資料按照grade分割槽,按照score排序,傳送到reduce,為保證能相同grade落到同一個reduce,還需要修改分割槽規則partition。

2、在reduce端利用groupingcomparator將grade相同的kv聚合成組,然後取第一個即是最大資料

程式碼實現:

定義成績資訊bean

/**
 * 成績資訊bean,實現hadoop的序列化機制
 * @author 張恩備
 *
 */
public class ScoreBean implements WritableComparable<ScoreBean>{
	private Text grade;
	private DoubleWritable score;

	public ScoreBean() {
	}
	public ScoreBean(Text grade, DoubleWritable score) {
		set(grade, score);
	}

	public void set(Text grade, DoubleWritable score) {

		this.grade = grade;
		this.score = score;

	}

	public Text getGrade() {
		return grade;
	}

	public DoubleWritable getScore() {
		return score;
	}

	@Override
	public int compareTo(ScoreBean o) {
		int cmp = this.grade.compareTo(o.getGrade());
		if (cmp == 0) {

			cmp = -this.score.compareTo(o.getScore());
		}
		return cmp;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(grade.toString());
		out.writeDouble(score.get());
		
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		String readUTF = in.readUTF();
		double readDouble = in.readDouble();
		
		this.grade = new Text(readUTF);
		this.score= new DoubleWritable(readDouble);
	}


	@Override
	public String toString() {
		return grade.toString() + "\t" + score.get();
	}
}

自定義partation分片

public class GradePartitioner extends Partitioner<ScoreBean, NullWritable>{

	@Override
	public int getPartition(ScoreBean bean, NullWritable value, int numReduceTasks) {
		//相同grade的成績bean,會發往相同的partition
		//而且,產生的分割槽數,是會跟使用者設定的reduce task數保持一致
		return (bean.getGrade().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
		
	}

}

自定義groupingcomparator

/**
 * 用於控制shuffle過程中reduce端對kv對的聚合邏輯
 * @author 張恩備
 *
 */
public class GradeGroupingComparator extends WritableComparator {

	protected GradeGroupingComparator() {

		super(ScoreBean.class, true);
	}
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		ScoreBean abean = (ScoreBean) a;
		ScoreBean bbean = (ScoreBean) b;
		
		//將grade相同的bean都視為相同,從而聚合為一組
		return abean.getGrade().compareTo(bbean.getGrade());
	}
}
編寫mapreduce處理流程
/**
 * 利用secondarysort機制輸出每個等級下成績最大的記錄
 * @author 張恩備
 */
public class SecondarySort {
	
	static class SecondarySortMapper extends Mapper<LongWritable, Text, ScoreBean, NullWritable>{
		
		ScoreBean bean = new ScoreBean();
		
		@Override
		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

			String line = value.toString();
			String[] fields = StringUtils.split(line, "\t");
			
			bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));
			
			context.write(bean, NullWritable.get());
			
		}
		
	}
	
	static class SecondarySortReducer extends Reducer<ScoreBean, NullWritable, ScoreBean, NullWritable>{
		
		
		//在設定了groupingcomparator以後,這裡收到的kv資料 就是:  <1001 87.6>,null  <1001 76.5>,null  .... 
		//此時,reduce方法中的引數key就是上述kv組中的第一個kv的key:<1001 87.6>
		//要輸出同一個grade的所有成績中最大金額的那一個,就只要輸出這個key
		@Override
		protected void reduce(ScoreBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
			context.write(key, NullWritable.get());
		}
	}
	
	
	public static void main(String[] args) throws Exception {
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(SecondarySort.class);
		
		job.setMapperClass(SecondarySortMapper.class);
		job.setReducerClass(SecondarySortReducer.class);
		
		
		job.setOutputKeyClass(ScoreBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		//指定shuffle所使用的GroupingComparator類
		job.setGroupingComparatorClass(GradeGroupingComparator.class);
		//指定shuffle所使用的partitioner類
		job.setPartitionerClass(GradePartitioner.class);
		
		job.setNumReduceTasks(3);
		
		job.waitForCompletion(true);
		
	}

}

回顧案例,需要自定義這幾個元素:

  1. 自定義的複合key(保證到達Reduce前完成grade、score排序)
  2. 自定義的partitioner(保證相同grade能落在同一個reduce端)
  3. 自定義的GroupingComparator(保證reduce聚合操作時只按照grade對bean聚合交給reduce排序)

瞭解更多推薦:

https://my.oschina.net/leejun2005/blog/135085 ,作者寫的很多文章都有借鑑,再次感謝