MapReduce程序之求一年中的最高溫度和最低溫度
阿新 • • 發佈:2018-03-06
大數據 Hadoop MapReduce Java [TOC]
MapReduce程序之求一年中的最高溫度和最低溫度
前言
看過《Hadoop權威指南》的同學都知道,關於MapReduce的第一個入門的例子就是統計全球氣溫,書上的例子是使用了全部的數據來作為統計,但實際上只需要拿某一年的數據來作為測試也就OK了,所以下面寫的程序用的數據是某一年的氣溫數據。
數據獲取與說明
可以在下面的網址中下載到全部的數據:
ftp://ftp.ncdc.noaa.gov/pub/data/gsod/
同時這個網址也有提供關於數據每個字段的說明,也就是readme.txt文件,因為我們關註的是氣溫的最大與最小值,所以只需要查看相關的說明即可,其關於氣溫最值說明如下:
MAX 103-108 Real Maximum temperature reported during the day in Fahrenheit to tenths--time of max temp report varies by country and region, so this will sometimes not be the max for the calendar day. Missing = 9999.9 MIN 111-116 Real Minimum temperature reported during the day in Fahrenheit to tenths--time of min temp report varies by country and region, so this will sometimes not be the min for the calendar day. Missing = 9999.9
也就是說,每行的第103-108個字符為當天的最高氣溫,第111-116個字符為當天的最低氣溫,基於此就可以寫出我們的MapReduce程序了。
程序思路
/** 數據源:ftp://ftp.ncdc.noaa.gov/pub/data/gsod/ 求出一年中的最高溫度和最低溫度。 * MR應用程序 * * Map<k1, v1, k2, v2> * 第一步:確定map的類型參數 * k1, v1是map函數的輸入參數 * k2, v2是map函數的輸出參數 * 對於普通的文本文件的每一行的起始偏移量就是k1,---->Long(LongWritable) * 對於普通的文本文件,v2就是其中的一行數據,是k1所對應的一行數據,---->String(Text) * k2, v2 * k2就是拆分後的單詞,---->String(Text) * v2就是溫度---->double(DoubleWritable) * 第二步:編寫一個類繼承Mapper * 復寫其中的map函數 * Reduce<k2, v2s, k3, v3> * 第一步:確定reduce的類型 * k2, v2s是reduce函數的輸入參數 * k3, v3是reduce函數的輸出參數 * k2 --->Text * v2s ---->Iterable<DoubleWritable> * * k3 聚合之後的單詞---->Text * v3 聚合之後的單詞對應的次數--->DoubleWritable 第二步:編寫一個類繼承Reducer * 復寫其中的reduce函數 * * * 第三步:編寫完map和reduce之後,將二者通過驅動程序組裝起來,進行執行 * * * mr的執行的方式: * yarn/hadoop jar jar的路徑 全類名 參數 */
MapReduce程序
根據程序思路和數據格式,寫出的MapReduce程序如下:
package com.uplooking.bigdata.mr.weather; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import java.io.IOException; public class WeatherJob { public static void main(String[] args) throws Exception { if (args == null || args.length < 2) { System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>"); System.exit(-1); } Path inputPath = new Path(args[0]); Path outputPath = new Path(args[1]); Configuration conf = new Configuration(); String jobName = WeatherJob.class.getSimpleName(); Job job = Job.getInstance(conf, jobName); //設置job運行的jar job.setJarByClass(WeatherJob.class); //設置整個程序的輸入 FileInputFormat.setInputPaths(job, inputPath); job.setInputFormatClass(TextInputFormat.class);//就是設置如何將輸入文件解析成一行一行內容的解析類 //設置mapper job.setMapperClass(WeatherMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); //設置整個程序的輸出 // outputpath.getFileSystem(conf).delete(outputpath, true);//如果當前輸出目錄存在,刪除之,以避免.FileAlreadyExistsException FileOutputFormat.setOutputPath(job, outputPath); job.setOutputFormatClass(TextOutputFormat.class); //設置reducer job.setReducerClass(WeatherReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DoubleWritable.class); //指定程序有幾個reducer去運行 job.setNumReduceTasks(1); //提交程序 job.waitForCompletion(true); } public static class WeatherMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String line = v1.toString(); Double max = null; Double min = null; try { // 獲取一行中的氣溫MAX值 max = Double.parseDouble(line.substring(103, 108)); // 獲取一行中的氣溫MIN值 min = Double.parseDouble(line.substring(111, 116)); } catch (NumberFormatException e) { // 如果出現異常,則當前的這一個map task不執行,直接返回 return; } // 寫到context中 context.write(new Text("MAX"), new DoubleWritable(max)); context.write(new Text("MIN"), new DoubleWritable(min)); } } public static class WeatherReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text k2, Iterable<DoubleWritable> v2s, Context context) throws IOException, InterruptedException { // 先預定義最大和最小氣溫值 double max = Double.MIN_VALUE; double min = Double.MAX_VALUE; // 得到叠代列表中的氣溫最大值和最小值 if ("MAX".equals(k2.toString())) { for (DoubleWritable v2 : v2s) { double tmp = v2.get(); if (tmp > max) { max = tmp; } } } else { for (DoubleWritable v2 : v2s) { double tmp = v2.get(); if (tmp < min) { min = tmp; } } } // 將結果寫入到context中 context.write(k2, "MAX".equals(k2.toString()) ? new DoubleWritable(max) : new DoubleWritable(min)); } } }
測試
註意,上面的程序是需要讀取命令行的參數輸入的,可以在本地的環境執行,也可以打包成一個jar包上傳到Hadoop環境的Linux服務器上執行,這裏,我使用的是本地環境(我的操作系統是Mac OS),輸入的參數如下:
/Users/yeyonghao/data/input/010010-99999-2015.op /Users/yeyonghao/data/output/mr/weather/w-1
執行程序,輸出結果如下:
...省略部分輸出...
2018-03-06 11:23:14,915 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - Job job_local2102200632_0001 running in uber mode : false
2018-03-06 11:23:14,917 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - map 100% reduce 100%
2018-03-06 11:23:14,918 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - Job job_local2102200632_0001 completed successfully
2018-03-06 11:23:14,927 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - Counters: 30
...省略部分輸出...
MapReduce程序執行成功後,再查看輸出目錄中的輸出結果:
yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/weather/w-1$ cat part-r-00000
MAX 57.6
MIN 6.3
可以看到,已經可以正確統計出最大氣溫和最小氣溫值,說明我們的MapReduce程序沒有問題。
MapReduce程序之求一年中的最高溫度和最低溫度