1. 程式人生 > >18-hadoop-weather案例

18-hadoop-weather案例

ping ide exc 所有 void 每年 [] exce framework

weather案例, 簡單分析每年的前三個月的最高溫即可, 使用自定義的分組和排序

1, MyKey,

因為對溫度進行分組, 排序, pardition操作, 所以默認的字典順序不能滿足需求

package com.wenbronk.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * 自定義key, 對key進行分組
 * 實現writableComparble方法, 可序列化並比較是否同一個對象
 * @author root
 *
 
*/ public class MyKey implements WritableComparable<MyKey> { private int year; private int month; private double hot; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return
month; } public void setMonth(int month) { this.month = month; } public double getHot() { return hot; } public void setHot(double hot) { this.hot = hot; } /** * 反序列化 */ @Override public void readFields(DataInput arg0) throws IOException {
this.year = arg0.readInt(); this.month = arg0.readInt(); this.hot = arg0.readDouble(); } /** * 序列化 */ @Override public void write(DataOutput arg0) throws IOException { arg0.writeInt(year); arg0.writeInt(month); arg0.writeDouble(hot); } /** * 比較, 判斷是否同一個對象, 當對象作為key時 */ @Override public int compareTo(MyKey o) { int c1 = Integer.compare(this.year, o.getYear()); if (c1 == 0) { int c2 = Integer.compare(this.month, o.getMonth()); if (c2 == 0) { return Double.compare(this.hot, o.getHot()); } } return 1; } }

2, sort

package com.wenbronk.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定義排序
 * @author root
 */
public class MySort extends WritableComparator {
    
    /**
     * 在構造方法中, 通過調用父類構造創建MyKey
     * MyKey.class : 比較的對象
     * true : 創建這個對象
     */
    public MySort() {
        super(MyKey.class, true);
    }
    
    /**
     * 自定義排序方法
     * 傳入的比較對象為 map 輸出的key
     * 
     * 年相同比較月, 月相同, 溫度降序
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        MyKey key1 = (MyKey) a;
        MyKey key2 = (MyKey) b;
        
        int r1 = Integer.compare(key1.getYear(), key2.getYear());
        if (r1 == 0) {
            int r2 = Integer.compare(key1.getMonth(), key2.getMonth());
            
            if (r2 == 0) {
                // 溫度降序
                return - Double.compare(key1.getHot(), key2.getHot());
            }else {
                return r2;
            }
        }
        return r1;
    }
    
}

3, group

package com.wenbronk.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定義分組
 * @author root
 *
 */
public class MyGroup extends WritableComparator {

    public MyGroup() {
        super(MyKey.class, true);
    }
    
    /**
     * 年, 月相同, 則為一組
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        MyKey key1 = (MyKey) a;
        MyKey key2 = (MyKey) b;
        
        int r1 = Integer.compare(key1.getYear(), key2.getYear());
        if (r1 == 0) {
            return Integer.compare(key1.getMonth(), key2.getMonth());
        }
        return r1;
    }
    
}

4, parditon

package com.wenbronk.weather;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * 自定義partition, 保證一年一個reducer進行處理
 * 從map接收值
 * @author root
 *
 */
public class MyPartition extends HashPartitioner<MyKey, DoubleWritable> {

    /**
     * maptask每輸出一個數據, 調用一次此方法
     * 執行時間越短越好
     * 年的數量是確定的, 可以傳遞reduceTask數量, 在配置文件可設置, 在程序執行時也可設置
     * 
     */
    @Override
    public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
        // 減去最小的, 更精確
        return (key.getYear() - 1949) % numReduceTasks;
    }
    
}

5, 執行類

package com.wenbronk.weather;

import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 執行mapreduce 統計每年溫度的前三個
 * 
 * @author wenbronk
 *
 */
public class RunMapReduce {

    public static void main(String[] args) throws Exception {
        // 初始化時加載src或classpath下所有的配置文件
        Configuration configuration = new Configuration();

        // 本地執行
         configuration.set("fs.default", "hdfs://wenbronk.hdfs.com:8020 ");
         configuration.set("yarn.resourcemanager", "hdfs://192.168.208.106");

        // 服務器執行
//        configuration.set("mapred.jar", "?C:/Users/wenbr/Desktop/weather.jar");
//        configuration.set("mapred.jar", "E:\\sxt\\target\\weather.jar");
//        configuration.set("mapreduce.app-submission.cross-platform", "true");
//        
//        configuration.set("mapreduce.framework.name", "yarn"); 
//        configuration.set("yarn.resourcemanager.address", "192.168.208.106:"+8030);
//        configuration.set("yarn.resourcemanager.scheduler.address", "192.168.208.106:"+8032);

        // 得到執行的任務
        Job job = Job.getInstance();
        // 入口類
        job.setJarByClass(RunMapReduce.class);

        // job名字
        job.setJobName("weather");

        // job執行是map執行的類
        job.setMapperClass(WeatherMapper.class);
        job.setReducerClass(WeatherReduce.class);
        job.setMapOutputKeyClass(MyKey.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        
        // 使用自定義的排序, 分組
        job.setPartitionerClass(MyPartition.class);
        job.setSortComparatorClass(MySort.class);
        job.setGroupingComparatorClass(MyGroup.class);
//        job.setJar("E:\\sxt\\target\\weather.jar");
        
        //設置 分區數量
        job.setNumReduceTasks(3);
        
        // **** 使用插件上傳data.txt到hdfs/root/usr/data.txt

        //****使得左邊為key, 右邊為value, 此類默認為  "\t" 可以自定義
        // 或者  config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
        job.setInputFormatClass(KeyValueTextInputFormat.class);
        
        // 使用文件
        FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\weather.txt"));
//        FileInputFormat.addInputPath(job, new Path("/root/usr/weather.txt"));

        // 使用一個不存在的目錄進行
        Path path = new Path("/root/usr/weather");
        // 如果存在刪除
        FileSystem fs = FileSystem.get(configuration);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }

        // 輸出
        FileOutputFormat.setOutputPath(job, path);

        boolean forCompletion = job.waitForCompletion(true);

        if (forCompletion) {
            System.out.println("success");
        }
    }

    /**
     * key: 將 LongWritalbe 改成 Text類型的
     * 
     * 將輸入更改為需要的 key, value, mapper所做的事情
     * 
     * @author wenbronk
     */
    static class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable> {
        /**
         * 轉換字符串為日期對象
         */
        DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        /**
         * 將鍵值取出來, 封裝為key 每行第一個分隔符"\t"左側為key, 右側有value, 傳遞過來的數據已經切割好了
         */
        @Override
        protected void map(Text key, Text value, Mapper<Text, Text, MyKey, DoubleWritable>.Context context)
                throws IOException, InterruptedException {
            try {
                Date date = formatter.parse(key.toString());
                Calendar calendar = Calendar.getInstance();
                calendar.setTime(date);
                int year = calendar.get(Calendar.YEAR);
                int month = calendar.get(Calendar.MONTH);

                double hot = Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));

                MyKey mykey = new MyKey();
                mykey.setYear(year);
                mykey.setMonth(month);
                mykey.setHot(hot);

                context.write(mykey, new DoubleWritable(hot));
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 經過partition, 分組, 排序, 傳遞數據給reducer 需要自定義partition, 保證一年一個reduce 自定義排序,
     * 保證按照年, 月, 溫度 自定義分組, 年月相同, 一個組
     * 傳進來的溫度, 為已經排好序的
     * @author root
     */
    static class WeatherReduce extends Reducer<MyKey, DoubleWritable, Text, NullWritable> {
        NullWritable nullWritable = NullWritable.get();
        @Override
        protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,
                Reducer<MyKey, DoubleWritable, Text, NullWritable>.Context arg2)
                throws IOException, InterruptedException {

            int i = 0;
            for (DoubleWritable doubleWritable : arg1) {
                i++;
                String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + doubleWritable.get();
                // key中已經包含需要的結果了
                arg2.write(new Text(msg), NullWritable.get());
                // 每個月的前三個
                if (i == 3) {
                    break;
                }
            }

        }
    }

}

初始文檔

1949-10-01 14:21:02    34c
1949-10-02 14:01:02    36c
1950-01-01 11:21:02    32c
1950-10-01 12:21:02    37c
1951-12-01 12:21:02    23c
1950-10-02 12:21:02    41c
1950-10-03 12:21:02    27c
1951-07-01 12:21:02    45c
1951-07-02 12:21:02    46c
1951-07-03 12:21:03    47c

系列來自尚學堂視頻

18-hadoop-weather案例