1. 程式人生 > >使用hadoop實現平均數~並輸出top N

使用hadoop實現平均數~並輸出top N

    對於求每個學生成績的平均數和top N問題在資料庫中可以通過sql語句就實現出來,這裡就不在做介紹。本文主要通過例項介紹hadoop如何求平均數以及輸出TOP N。

需求描述:

    求檔案中每個學生的平均成績,並將平均成績最高的N個輸出。

資料格式:

     檔案中的一行資料為一門成績記錄,簡化模型結果為“學生唯一標識 成績”,eg: "zs 90",本次測試資料如下圖所示:


需求分析:

     平均值:mapreduce程式中的map函式只簡單處理記錄中的一行資料,輸出結果為 key為學生唯一標識,value為學生的單科成績;ruduce函式中實現對每一個學生的成績求平均值。(之前部落格中有有關於mapreduce程式的輸入輸出問題,就不再作圖分析)

     TOP N:在ruduce中,如果將所有的成績儲存到陣列中,然後排序輸出,這種方式在資料量小的時候還是可行的,但是當資料量非常大的時候,就會造成記憶體溢位,因此這種方式就不可行。基於陣列的思想,所以可以考慮將已經計算的平均值的前N個儲存到長度為N的陣列中,當計算出下一個平均值,將此平均值插入該陣列中,具體演算法如下:

private void addTopN(double avg){
	if (avg > topN[N -1]) {
		int i = 0;
		for (i = 0; i < N && avg < topN[i]; i++);
		if (i < N) {
			for (int j = N-1; j > i; j--) {
				topN[j] = topN[j-1];
			}
			topN[i] = avg;
		}
	}
}
    通過上述方法,topN陣列中就儲存已計算出來的top N值,reduce函式執行完畢後,陣列中就是需求中的top N。

程式碼實現:

    行資料處理類:

 /**  
 *@Description: 成績單一行資料處理    
 */ 
package com.mapreduce.topn;  

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
  
public class Line {
	private String name;//學生唯一標識
	private int score;//成績
	private boolean right = true;
	
	public Line(String line) {
		if (line == null || "".equals(line)) {
			right = false;
			return;
		}
		String []ss = line.split(" ");
		if (ss.length != 2) {
			right = false;
			return;
		}
		name = ss[0];
		try {
			score = Integer.parseInt(ss[1]);
		} catch (Exception e) {
			score = 0;
		}
	}

	public Text getKey() {
		return new Text(name);
	}

	public IntWritable getValue() {
		return new IntWritable(score);
	}

	public boolean isRight() {
		return right;
	} 
}
    mapreduce程式:
 /**  
 *@Description: 平均成績 & Top N    
 */ 
package com.mapreduce.topn;  

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvgTop extends Configured implements Tool{
	
	/**
	 *@Description: map函式,輸出的結果為 “學生姓名 成績” eg "zs 90"
	 *@Author:lulei  
	 */
	public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			Line line = new Line(value.toString());
			if (line.isRight()) {
				context.write(line.getKey(), line.getValue());
			}
		}
		
	}
	
	/**
	 *@Description: reduce函式,計算avg & top N
	 *@Author:lulei  
	 */
	public static class Reduce extends Reducer<Text, IntWritable, Text, DoubleWritable> {
		private static double[] topN;
		private static int N = 1;
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			try {
				N = Integer.parseInt(context.getConfiguration().get("N"));
			} catch (Exception e){
				N = 1;
			}
			topN = new double[N]; 
		}
		
		/**
		 * @param avg
		 * @Author:lulei  
		 * @Description: 將avg插入到topN中
		 */
		private void addTopN(double avg){
			if (avg > topN[N -1]) {
				int i = 0;
				for (i = 0; i < N && avg < topN[i]; i++);
				if (i < N) {
					for (int j = N-1; j > i; j--) {
						topN[j] = topN[j-1];
					}
					topN[i] = avg;
				}
			}
		}
		
		/**
		 * @Author:lulei  
		 * @Description: 輸出top N的資料
		 */
		private void print() {
			for (double n : topN){
				System.out.print(n);
				System.out.print("->");
			}
			System.out.println();
		}

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,Context context)
				throws IOException, InterruptedException {
			int count = 0;
			int sum = 0;
			for (IntWritable value : values) {
				count++;
				sum += value.get();
			}
			//計算平均值
			double avg = (sum * 1.0D)/ count;
			//加入top N
			addTopN(avg);
			context.write(key, new DoubleWritable(avg));
		}

		@Override
		protected void cleanup(Context context) throws IOException, InterruptedException {
			//輸出topN
			print(); 
			super.cleanup(context);
		}

	}
	@Override
	public int run(String[] arg0) throws Exception {
		Configuration conf = new Configuration();
		conf.set("N", arg0[2]);
		@SuppressWarnings("deprecation")
		Job job = new Job(conf);
		job.setJobName("avg&topn");
		job.setInputFormatClass(TextInputFormat.class);
		
		//將輸出設定為TextOutputFormat
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DoubleWritable.class);
		
		//Mapper Reducer
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		
		//輸入 輸出路徑
		FileInputFormat.addInputPath(job, new Path(arg0[0]));
		FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
		
		job.waitForCompletion(true);
		
		return job.isSuccessful() ? 0 : 1;
	}
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub  
		if (args.length != 3) {
			System.out.println("hadoop jar **.jar com.mapreduce.topn.AvgTop [input] [output] [N]");
			System.exit(-1);
		}
		try {
			int res = ToolRunner.run(new Configuration(), new AvgTop(), args);
			System.exit(res);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

上傳執行:

    打包jar上傳伺服器,執行命令

hadoop jar avgtop.jar com.mapreduce.topn.AvgTop /root/avgtop/ /out/1 3

   top N輸出結果為:90.0->67.5->56.0->

   輸出結果如下: