HDPCD-Java-複習筆記(17)
阿新 • • 發佈:2018-12-26
Java lab booklet
MRUnit Test
package average;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.Before;
import org.junit.Test;
import average.AverageJob.AverageMapper;
import average.AverageJob.AverageReducer;
public class AverageJobTest {
private MapDriver<LongWritable, Text, Text, Text> mapDriver;
private ReduceDriver<Text, Text, Text, DoubleWritable> reduceDriver;
private MapReduceDriver<LongWritable, Text, Text, Text, Text, DoubleWritable> mapReduceDriver;
@Before
public void setup(){
AverageMapper mapper = new AverageMapper();
AverageReducer reducer = new AverageReducer();
mapDriver = MapDriver.newMapDriver(mapper);
reduceDriver = ReduceDriver.newReduceDriver(reducer);
mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
}
@Test
public void testMapper(){
LongWritable inputKey = new LongWritable(0);
Text inputValue = new Text("Abbeville, SC,45001,6581,7471, 6787,195278,302280,29673,40460 ,3042,3294");
mapDriver.withInput(inputKey, inputValue);
Text outputKey = new Text("SC");
Text outputValue = new Text("40460,1");
mapDriver.withOutput(outputKey, outputValue);
try {
mapDriver.runTest();
} catch (IOException e) {
e.printStackTrace();
}
}
@Test
public void testReducer() throws IOException{
Text inputKey = new Text("SC");
Text inputValue1 = new Text("40468,4");
Text inputValue2 = new Text("40462,8");
List<Text> values = new ArrayList<Text>();
values.add(inputValue1);
values.add(inputValue2);
reduceDriver.withInput(inputKey, values);
Text outputKey = new Text("SC");
DoubleWritable outputValue = new DoubleWritable(6744.166666666667);
reduceDriver.withOutput(outputKey, outputValue);
reduceDriver.runTest();
}
@Test
public void testMapperReducer() throws IOException{
LongWritable inputKey = new LongWritable(0);
Text inputValue = new Text("Abbeville, SC ,45001,6581,7471, 6787,195278,302280,29673,40460,3042,3294");
mapReduceDriver.withInput(inputKey, inputValue);
Text outputKey = new Text("SC");
DoubleWritable outputValue = new DoubleWritable(40460.0);
mapReduceDriver.withOutput(outputKey, outputValue);
mapReduceDriver.runTest();
}
}
package average; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class AverageJob extends Configured implements Tool { public static class AveragePartitioner extends Partitioner<Text, Text> { @Override public int getPartition(Text key, Text value, int numPartitions) { char firstLetter = key.toString().trim().charAt(0); return firstLetter % numPartitions; } } public enum Counters {MAP, COMBINE, REDUCE} public static class AverageMapper extends Mapper<LongWritable, Text, Text, Text> { private Text outputKey = new Text(); private Text outputValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String [] values = value.toString().split(","); outputKey.set(values[1].trim()); outputValue.set(values[9].trim() + ",1"); context.write(outputKey, outputValue); context.getCounter(Counters.MAP).increment(1); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { System.out.println("MAP counter = " + context.getCounter(Counters.MAP).getValue()); } } public static class AverageCombiner extends Reducer<Text, Text, Text, Text> { private Text outputValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { long sum = 0; int count = 0; while(values.iterator().hasNext()) { String [] current = values.iterator().next().toString().split(","); sum += Integer.parseInt(current[0]); count += Integer.parseInt(current[1]); } outputValue.set(sum + "," + count); context.write(key, outputValue); context.getCounter(Counters.COMBINE).increment(1); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { System.out.println("COMBINE counter = " + context.getCounter(Counters.COMBINE).getValue()); } } public static class AverageReducer extends Reducer<Text, Text, Text, DoubleWritable> { DoubleWritable outputValue = new DoubleWritable(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { double sum = 0.0; int count = 0; while(values.iterator().hasNext()) { String [] current = values.iterator().next().toString().split(","); sum += Long.parseLong(current[0]); count += Integer.parseInt(current[1]); } outputValue.set(sum/count); context.write(key, outputValue); context.getCounter(Counters.REDUCE).increment(1); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { System.out.println(context.getCounter(Counters.REDUCE).getValue()); } } @Override public int run(String[] arg0) throws Exception { Configuration conf = super.getConf(); Job job = Job.getInstance(conf, "AverageJob"); job.setJarByClass(AverageJob.class); Path out = new Path("counties/output"); out.getFileSystem(conf).delete(out, true); FileInputFormat.setInputPaths(job, "counties"); FileOutputFormat.setOutputPath(job, out); job.setMapperClass(AverageMapper.class); job.setReducerClass(AverageReducer.class); job.setCombinerClass(AverageCombiner.class); job.setPartitionerClass(AveragePartitioner.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(5); return job.waitForCompletion(true)?0:1; } public static void main(String[] args) { int result = 0; try { result = ToolRunner.run(new Configuration(), new AverageJob(), args); } catch (Exception e) { e.printStackTrace(); } System.exit(result); } }