1. 程式人生 > >MapReduce程序之求一年中的最高溫度和最低溫度

MapReduce程序之求一年中的最高溫度和最低溫度

大數據 Hadoop MapReduce Java

[TOC]


MapReduce程序之求一年中的最高溫度和最低溫度

前言

看過《Hadoop權威指南》的同學都知道,關於MapReduce的第一個入門的例子就是統計全球氣溫,書上的例子是使用了全部的數據來作為統計,但實際上只需要拿某一年的數據來作為測試也就OK了,所以下面寫的程序用的數據是某一年的氣溫數據。

數據獲取與說明

可以在下面的網址中下載到全部的數據:

ftp://ftp.ncdc.noaa.gov/pub/data/gsod/

同時這個網址也有提供關於數據每個字段的說明,也就是readme.txt文件,因為我們關註的是氣溫的最大與最小值,所以只需要查看相關的說明即可,其關於氣溫最值說明如下:

MAX     103-108   Real   Maximum temperature reported during the 
                         day in Fahrenheit to tenths--time of max 
                         temp report varies by country and        
                         region, so this will sometimes not be    
                         the max for the calendar day.  Missing = 
                         9999.9   

MIN     111-116   Real   Minimum temperature reported during the 
                         day in Fahrenheit to tenths--time of min 
                         temp report varies by country and        
                         region, so this will sometimes not be  
                         the min for the calendar day.  Missing = 
                         9999.9

也就是說,每行的第103-108個字符為當天的最高氣溫,第111-116個字符為當天的最低氣溫,基於此就可以寫出我們的MapReduce程序了。

程序思路

/**
    數據源:ftp://ftp.ncdc.noaa.gov/pub/data/gsod/
    求出一年中的最高溫度和最低溫度。

 * MR應用程序
 *
 *   Map<k1, v1, k2, v2>
 *   第一步:確定map的類型參數
 *      k1, v1是map函數的輸入參數
 *      k2, v2是map函數的輸出參數
 *      對於普通的文本文件的每一行的起始偏移量就是k1,---->Long(LongWritable)
 *      對於普通的文本文件,v2就是其中的一行數據,是k1所對應的一行數據,---->String(Text)
 *      k2, v2
 *          k2就是拆分後的單詞,---->String(Text)
 *          v2就是溫度---->double(DoubleWritable)
 *   第二步:編寫一個類繼承Mapper
 *      復寫其中的map函數
 *   Reduce<k2, v2s, k3, v3>
 *    第一步:確定reduce的類型
 *      k2, v2s是reduce函數的輸入參數
 *      k3, v3是reduce函數的輸出參數
 *      k2  --->Text
 *      v2s ---->Iterable<DoubleWritable>
 *
 *      k3 聚合之後的單詞---->Text
 *      v3 聚合之後的單詞對應的次數--->DoubleWritable
 第二步:編寫一個類繼承Reducer
 *      復寫其中的reduce函數
 *
 *
 *  第三步:編寫完map和reduce之後,將二者通過驅動程序組裝起來,進行執行
 *
 *
 *  mr的執行的方式:
 *  yarn/hadoop jar jar的路徑 全類名 參數
 */

MapReduce程序

根據程序思路和數據格式,寫出的MapReduce程序如下:

package com.uplooking.bigdata.mr.weather;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class WeatherJob {

    public static void main(String[] args) throws Exception {
        if (args == null || args.length < 2) {
            System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>");
            System.exit(-1);
        }

        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        Configuration conf = new Configuration();
        String jobName = WeatherJob.class.getSimpleName();
        Job job = Job.getInstance(conf, jobName);
        //設置job運行的jar
        job.setJarByClass(WeatherJob.class);
        //設置整個程序的輸入
        FileInputFormat.setInputPaths(job, inputPath);
        job.setInputFormatClass(TextInputFormat.class);//就是設置如何將輸入文件解析成一行一行內容的解析類
        //設置mapper
        job.setMapperClass(WeatherMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        //設置整個程序的輸出
        // outputpath.getFileSystem(conf).delete(outputpath, true);//如果當前輸出目錄存在,刪除之,以避免.FileAlreadyExistsException
        FileOutputFormat.setOutputPath(job, outputPath);
        job.setOutputFormatClass(TextOutputFormat.class);
        //設置reducer
        job.setReducerClass(WeatherReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        //指定程序有幾個reducer去運行
        job.setNumReduceTasks(1);
        //提交程序
        job.waitForCompletion(true);
    }

    public static class WeatherMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> {
        @Override
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            String line = v1.toString();
            Double max = null;
            Double min = null;
            try {
                // 獲取一行中的氣溫MAX值
                max = Double.parseDouble(line.substring(103, 108));
                // 獲取一行中的氣溫MIN值
                min = Double.parseDouble(line.substring(111, 116));
            } catch (NumberFormatException e) {
                // 如果出現異常,則當前的這一個map task不執行,直接返回
                return;
            }
            // 寫到context中
            context.write(new Text("MAX"), new DoubleWritable(max));
            context.write(new Text("MIN"), new DoubleWritable(min));
        }
    }

    public static class WeatherReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        @Override
        protected void reduce(Text k2, Iterable<DoubleWritable> v2s, Context context) throws IOException, InterruptedException {
            // 先預定義最大和最小氣溫值
            double max = Double.MIN_VALUE;
            double min = Double.MAX_VALUE;
            // 得到叠代列表中的氣溫最大值和最小值
            if ("MAX".equals(k2.toString())) {
                for (DoubleWritable v2 : v2s) {
                    double tmp = v2.get();
                    if (tmp > max) {
                        max = tmp;
                    }
                }
            } else {
                for (DoubleWritable v2 : v2s) {
                    double tmp = v2.get();
                    if (tmp < min) {
                        min = tmp;
                    }
                }
            }
            // 將結果寫入到context中
            context.write(k2, "MAX".equals(k2.toString()) ? new DoubleWritable(max) : new DoubleWritable(min));
        }
    }
}

測試

註意,上面的程序是需要讀取命令行的參數輸入的,可以在本地的環境執行,也可以打包成一個jar包上傳到Hadoop環境的Linux服務器上執行,這裏,我使用的是本地環境(我的操作系統是Mac OS),輸入的參數如下:

/Users/yeyonghao/data/input/010010-99999-2015.op /Users/yeyonghao/data/output/mr/weather/w-1

執行程序,輸出結果如下:

...省略部分輸出...
2018-03-06 11:23:14,915 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - Job job_local2102200632_0001 running in uber mode : false
2018-03-06 11:23:14,917 [main] [org.apache.hadoop.mapreduce.Job] [INFO] -  map 100% reduce 100%
2018-03-06 11:23:14,918 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - Job job_local2102200632_0001 completed successfully
2018-03-06 11:23:14,927 [main] [org.apache.hadoop.mapreduce.Job] [INFO] - Counters: 30
...省略部分輸出...

MapReduce程序執行成功後,再查看輸出目錄中的輸出結果:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/weather/w-1$ cat part-r-00000
MAX 57.6
MIN 6.3

可以看到,已經可以正確統計出最大氣溫和最小氣溫值,說明我們的MapReduce程序沒有問題。

MapReduce程序之求一年中的最高溫度和最低溫度