1. 程式人生 > >hadoop 天氣案例

hadoop 天氣案例

對下面一組氣溫資料進行處理,得到每個月份最高的兩個氣溫值

2018-12-12 14:30 25c
2018-12-12 15:30 26c
2017-12-12 12:30 36c
2019-01-01 14:30 22c
2018-05-05 15:30 26c
2018-05-26 15:30 37c
2018-05-06 15:30 36c
2018-07-05 15:30 36c
2018-07-05 12:30 40c
2017-12-15 12:30 16c

 

輸出格式如下:

2019-1 22
2018-12 26
2018-12 25
2018-7 40
2018-7 36
2018-5 37
2018-5 36
2017-12 36
2017-12 16

 

public class App {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration(true);
        conf.set("fs.defaultFS","hdfs://hadoop01:9000");
     //windows下面執行新增一下兩個配置 conf.
set("mapreduce.app-submission.cross-platform
","true"); conf.set("mapreduce.framework.name","local"); Job job = Job.getInstance(conf); //設定jobName job.setJobName("myJob"); job.setJarByClass(App.class); //配置map //mapper類 job.setMapperClass(MyMapperClass.class); //輸出的key型別
job.setMapOutputKeyClass(TQ.class); //輸出的value型別 job.setMapOutputValueClass(IntWritable.class); //將輸出的(K,V)=>(K,V,P) //job.setPartitionerClass(MyPartitioner.class); //資料在記憶體spill(溢寫)之前先排序,注:繼承WritableComparator job.setSortComparatorClass(MySortComparator.class); //配置reduce //根據需求確定分組的維度,繼承自WritableComparator job.setGroupingComparatorClass(MyGrouping.class); //如map階段根據年、月、溫度三個維度排序,而reduce只根據年、月兩個維度 job.setReducerClass(MyReduce.class); Path input=new Path("/input/weather.txt"); Path out=new Path("/output/weather"); if(out.getFileSystem(conf).exists(out)){ out.getFileSystem(conf).delete(out,true); } //資料來源 HDFS路徑 FileInputFormat.addInputPath(job,input); //計算結果的輸出目錄 FileOutputFormat.setOutputPath(job,out); //job.setNumReduceTasks(2); job.waitForCompletion(true); } }

 

public class TQ implements WritableComparable<TQ> {

    private int year;

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public int getDay() {
        return day;
    }

    public void setDay(int day) {
        this.day = day;
    }

    public int getTemp() {
        return temp;
    }

    public void setTemp(int temp) {
        this.temp = temp;
    }

    private int month;
    private int day;
    /**
        溫度
     */
    private int temp;

    @Override
    public int compareTo(TQ other) {

        int c1= Integer.compare(this.getYear(),other.getYear());
        if(c1==0){
           return Integer.compare(this.getMonth(),other.getMonth());
        }
        return c1;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.year);
        out.writeInt(this.month);
        out.writeInt(this.day);
        out.writeInt(this.temp);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.year=in.readInt();
        this.month=in.readInt();
        this.day=in.readInt();
        this.temp=in.readInt();
    }
}

 

 

 

/**
 * 根據年-月對map輸出進行分組
 */
public class MyGrouping extends WritableComparator {
    public MyGrouping(){
        super(TQ.class,true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        TQ tq1 = (TQ) a;
        TQ tq2 = (TQ) b;
        if (tq1.getYear() == tq2.getYear() && tq1.getMonth() == tq2.getMonth()) {
            return 0;
        }
        return 1;
    }
}
public class MyMapperClass extends Mapper<LongWritable,Text,TQ, IntWritable> {
    TQ tq=new TQ();
    IntWritable outVal=new IntWritable();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[]splits= value.toString().split(" ");
        String[]date=splits[0].split("-");
        tq.setYear(Integer.parseInt(date[0]));
        tq.setMonth(Integer.parseInt(date[1]));
        tq.setDay(Integer.parseInt(date[2]));

        tq.setTemp(Integer.parseInt(splits[2].replace("c","")));
        outVal.set(tq.getTemp());
        context.write(tq,outVal);

    }
}
public class MyReduce extends Reducer<TQ, IntWritable, Text,IntWritable> {
    Text txtKey=new Text();

    @Override
    protected void reduce(TQ key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        Iterator<IntWritable> iterator = values.iterator();
        int flag=0;
        while (iterator.hasNext()) {
            if (flag == 2) {
                break;
            }
            txtKey.set(String.format("%s-%s",key.getYear(),key.getMonth()));

            IntWritable next = iterator.next();
            context.write(txtKey,next);
            flag++;
        }
    }
}
/**
    資料在記憶體spill(溢寫)之前先排序,根據年月溫度
 */
public class MySortComparator extends WritableComparator {
    public MySortComparator(){
        super(TQ.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        TQ tq1=(TQ)a;
        TQ tq2=(TQ)b;

        int c1= Integer.compare(tq1.getYear(),tq2.getYear());
            if(c1==0){
                int c2=Integer.compare(tq1.getMonth(),tq2.getMonth());
                if (c2 == 0) {
                    return -Integer.compare(tq1.getTemp(),tq2.getTemp());
                }
                return -c2;
            }
        return -c1;
    }
}