Tool介面對MapReduce進行優化
阿新 • • 發佈:2018-12-13
繼承Configured,實現Tool,不在run方法中new Configuration,呼叫run時,用ToolRunner呼叫
package com.kfk.hadoop.mr; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class WordCountUpMR extends Configured implements Tool { //1.map public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ private Text mapOutputKey = new Text(); private IntWritable mapOutputValue = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split(" "); for (String str:strs){ mapOutputKey.set(str); context.write(mapOutputKey,mapOutputValue); } } } //2.reduce public static class WordCountReducer extends Reducer<Text,IntWritable, Text,IntWritable>{ private IntWritable outputValue = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable val:values){ count+=val.get(); } outputValue.set(count); context.write(key,outputValue); } } //3.driver public int run(String args[])throws Exception { //1)get conf Configuration configuration = this.getConf(); //2)create job Job job = Job.getInstance(configuration, this.getClass().getSimpleName()); job.setJarByClass(this.getClass()); //3.1)input Path path = new Path(args[0]); FileInputFormat.addInputPath(job,path); //3.2)map job.setMapperClass(WordCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //3.3)reduce job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //3.4)output Path output = new Path(args[1]); FileOutputFormat.setOutputPath(job,output); //3.5)commit boolean isSuccess = job.waitForCompletion(true); return (isSuccess)?0:1; } public static void main(String[] args){ args = new String[]{ "/user/kfk/data/wc", "/user/kfk/data/output1" }; Configuration configuration = new Configuration(); try { Path fileOutPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(fileOutPath)){ fileSystem.delete(fileOutPath,true); } int status = ToolRunner.run(configuration,new WordCountUpMR(),args); System.exit(status); }catch (Exception e){ e.printStackTrace(); } } }