1. 程式人生 > >HDPCD-Java-複習筆記(17)

HDPCD-Java-複習筆記(17)

Java lab booklet

MRUnit Test

package average;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;

import average.AverageJob.AverageMapper;
import average.AverageJob.AverageReducer;

public class AverageJobTest {
	private MapDriver<LongWritable, Text, Text, Text> mapDriver;
	private ReduceDriver<Text, Text, Text, DoubleWritable> reduceDriver;
	private MapReduceDriver<LongWritable, Text, Text, Text, Text, DoubleWritable> mapReduceDriver;
	
	@Before
	public void setup(){
		AverageMapper mapper = new AverageMapper();
		AverageReducer reducer = new AverageReducer();
		mapDriver = MapDriver.newMapDriver(mapper);
	    reduceDriver = ReduceDriver.newReduceDriver(reducer);
	    mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
	}
	
	@Test
	public void testMapper(){
		LongWritable inputKey = new LongWritable(0);
		Text inputValue = new Text("Abbeville, SC,45001,6581,7471, 6787,195278,302280,29673,40460
,3042,3294"); mapDriver.withInput(inputKey, inputValue); Text outputKey = new Text("SC"); Text outputValue = new Text("40460,1"); mapDriver.withOutput(outputKey, outputValue); try { mapDriver.runTest(); } catch (IOException e) { e.printStackTrace(); } } @Test public void testReducer() throws IOException{ Text inputKey = new Text("SC"); Text inputValue1 = new Text("40468,4"); Text inputValue2 = new Text("40462,8"); List<Text> values = new ArrayList<Text>(); values.add(inputValue1); values.add(inputValue2); reduceDriver.withInput(inputKey, values); Text outputKey = new Text("SC"); DoubleWritable outputValue = new DoubleWritable(6744.166666666667); reduceDriver.withOutput(outputKey, outputValue); reduceDriver.runTest(); } @Test public void testMapperReducer() throws IOException{ LongWritable inputKey = new LongWritable(0); Text inputValue = new Text("Abbeville, SC
,45001,6581,7471, 6787,195278,302280,29673,40460,3042,3294"); mapReduceDriver.withInput(inputKey, inputValue); Text outputKey = new Text("SC"); DoubleWritable outputValue = new DoubleWritable(40460.0); mapReduceDriver.withOutput(outputKey, outputValue); mapReduceDriver.runTest(); } }
package average;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AverageJob extends Configured implements Tool {
	
	public static class AveragePartitioner extends Partitioner<Text, Text> {

		@Override
		public int getPartition(Text key, Text value, int numPartitions) {
			char firstLetter = key.toString().trim().charAt(0);
			return firstLetter % numPartitions;
		}
	}

	public enum Counters {MAP, COMBINE, REDUCE} 

	public static class AverageMapper extends Mapper<LongWritable, Text, Text, Text> {
		private Text outputKey = new Text();
		private Text outputValue = new Text();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String [] values = value.toString().split(",");
			outputKey.set(values[1].trim());
			outputValue.set(values[9].trim() + ",1");
			context.write(outputKey, outputValue);
			context.getCounter(Counters.MAP).increment(1);
		}

		@Override
		protected void cleanup(Context context)
				throws IOException, InterruptedException {
			System.out.println("MAP counter = " + context.getCounter(Counters.MAP).getValue());
		}
	}

	public static class AverageCombiner extends Reducer<Text, Text, Text, Text> {
		private Text outputValue = new Text();
		
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			long sum = 0;
			int count = 0;
			while(values.iterator().hasNext()) {
				String [] current = values.iterator().next().toString().split(",");
				sum += Integer.parseInt(current[0]);
				count += Integer.parseInt(current[1]);
			}
			outputValue.set(sum + "," + count);
			context.write(key, outputValue);
			context.getCounter(Counters.COMBINE).increment(1);
		}		

		@Override
		protected void cleanup(Context context)
				throws IOException, InterruptedException {
			System.out.println("COMBINE counter = " + context.getCounter(Counters.COMBINE).getValue());
		}
	}

	public static class AverageReducer extends Reducer<Text, Text, Text, DoubleWritable> {
		DoubleWritable outputValue = new DoubleWritable();
		
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			double sum = 0.0;
			int count = 0;
			while(values.iterator().hasNext()) {
				String [] current = values.iterator().next().toString().split(",");
				sum += Long.parseLong(current[0]);
				count += Integer.parseInt(current[1]);
			}
			outputValue.set(sum/count);
			context.write(key, outputValue);
			context.getCounter(Counters.REDUCE).increment(1);
		}

		@Override
		protected void cleanup(Context context)
				throws IOException, InterruptedException {
			System.out.println(context.getCounter(Counters.REDUCE).getValue());
		}
	}	

	@Override
	public int run(String[] arg0) throws Exception {
		Configuration conf = super.getConf();
		Job job = Job.getInstance(conf, "AverageJob");
		job.setJarByClass(AverageJob.class);

		Path out = new Path("counties/output");
		out.getFileSystem(conf).delete(out, true);
		FileInputFormat.setInputPaths(job, "counties");
		FileOutputFormat.setOutputPath(job, out);
		

		job.setMapperClass(AverageMapper.class);
		job.setReducerClass(AverageReducer.class);
		job.setCombinerClass(AverageCombiner.class);
		job.setPartitionerClass(AveragePartitioner.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(5);

		return job.waitForCompletion(true)?0:1;
	}


	public static void main(String[] args) {
		int result = 0;
		try {
			result = ToolRunner.run(new Configuration(),  new AverageJob(), args);
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.exit(result);
	}
}