Hadoop MapReduce開發--最小最大值
阿新 • • 發佈:2019-01-12
測試資料:
file1.txt
102
10
39
109
200
11
2
90
28
file2.txt
5
2
30
838
10005
mapper程式碼:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class MaxMinMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private static final Text KEY = new Text("key"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(KEY, new IntWritable(Integer.valueOf(value.toString().trim()))); } }
reducer程式碼:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class MaxMinReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { long min = Long.MAX_VALUE; long max = Long.MIN_VALUE; for(IntWritable val : values) { if (min > val.get()) { min = val.get(); } if (max < val.get()) { max = val.get(); } } context.write(new Text("MIN:"), new IntWritable(Integer.valueOf("" + min))); context.write(new Text("MAX:"), new IntWritable(Integer.valueOf("" + max))); } }
main程式碼:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 求最大最小值。資料量不多的情況下,直接把輸入資料做為一個相同key和list<value>形式 */ public class JobMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if(args.length != 2) { System.err.println("Usage: MinMaxTemperature<input path> <output path>"); System.exit(-1); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "max_min_job"); job.setJarByClass(Job.class); job.setMapperClass(MaxMinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(MaxMinReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); Path outDirPath = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(outDirPath)) { fs.delete(outDirPath, true); } FileOutputFormat.setOutputPath(job, outDirPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
結果:
MIN: 2
MAX: 10005