Mapreduce實戰-求每年最高氣溫
阿新 • • 發佈:2020-12-29
1.專案檔案:
2014010114
2014010216
2014010317
2014010410
2014010506
2012010609
2012010732
2012010812
2012010919
2012011023
2001010116
2001010212
2001010310
2001010411
2001010529
2013010619
2013010722
2013010812
2013010929
2013011023
2008010105
2008010216
2008010337
2008010414
2008010516
2007010619
2007010712
2007010812
2007010999
2007011023
2010010114
2010010216
2010010317
2010010410
2010010506
2015010649
2015010722
2015010812
2015010999
2015011023
2.原始碼
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper;View Codeimport org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Temperature { /** * 四個泛型型別分別代表: * KeyIn Mapper的輸入資料的Key,這裡是每行文字的起始位置(0,11,...) * ValueIn Mapper的輸入資料的Value,這裡是每行文字 * KeyOut Mapper的輸出資料的Key,這裡是每行文字中的“年份” * ValueOut Mapper的輸出資料的Value,這裡是每行文字中的“氣溫”*/ static class TempMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 列印樣本: Before Mapper: 0, 2000010115 System.out.print("Before Mapper: " + key + ", " + value); String line = value.toString(); String year = line.substring(0, 4); int temperature = Integer.parseInt(line.substring(8)); context.write(new Text(year), new IntWritable(temperature)); // 列印樣本: After Mapper:2000, 15 System.out.println( "======" + "After Mapper:" + new Text(year) + ", " + new IntWritable(temperature)); } } /** * 四個泛型型別分別代表: * KeyIn Reducer的輸入資料的Key,這裡是每行文字中的“年份” * ValueIn Reducer的輸入資料的Value,這裡是每行文字中的“氣溫” * KeyOut Reducer的輸出資料的Key,這裡是不重複的“年份” * ValueOut Reducer的輸出資料的Value,這裡是這一年中的“最高氣溫” */ static class TempReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; StringBuffer sb = new StringBuffer(); //取values的最大值 for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); sb.append(value).append(", "); } // 列印樣本: Before Reduce: 2000, 15, 23, 99, 12, 22, System.out.print("Before Reduce: " + key + ", " + sb.toString()); context.write(key, new IntWritable(maxValue)); // 列印樣本: After Reduce: 2000, 99 System.out.println( "======" + "After Reduce: " + key + ", " + maxValue); } } public static void main(String[] args) throws Exception { //輸入路徑 String dst = "hdfs://localhost:9000/intput.txt"; //輸出路徑,必須是不存在的,空檔案加也不行。 String dstOut = "hdfs://localhost:9000/output"; Configuration hadoopConfig = new Configuration(); hadoopConfig.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName() ); hadoopConfig.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName() ); Job job = new Job(hadoopConfig); //如果需要打成jar執行,需要下面這句 //job.setJarByClass(NewMaxTemperature.class); //job執行作業時輸入和輸出檔案的路徑 FileInputFormat.addInputPath(job, new Path(dst)); FileOutputFormat.setOutputPath(job, new Path(dstOut)); //指定自定義的Mapper和Reducer作為兩個階段的任務處理類 job.setMapperClass(TempMapper.class); job.setReducerClass(TempReducer.class); //設定最後輸出結果的Key和Value的型別 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //執行job,直到完成 job.waitForCompletion(true); System.out.println("Finished"); } }
3.執行頁面