1. 程式人生 > >MapReduce案例一:比較天氣溫度

MapReduce案例一:比較天氣溫度

1.需求

 

2.思路

3.程式碼實現

3.1MyWeather 類程式碼:

這個類主要是用來定義hadoop的配置,在執行計算程式時所需載入的一些類。

package com.hadoop.mr.weather;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import
org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TestMapReduceLazyOutput.TestMapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MyWeather { public static void main(String[] args) throws
IOException, ClassNotFoundException, InterruptedException { Configuration conf =new Configuration(true); Job job = Job.getInstance(conf); job.setJarByClass(MyWeather.class); //----------conf----------------------- //---begin Map :
//輸入格式化類 // job.setInputFormatClass(ooxx.class); //設定mapper類 job.setMapperClass(TMapper.class); job.setMapOutputKeyClass(TQ.class); job.setMapOutputValueClass(IntWritable.class); //設定partitioner類 job.setPartitionerClass(TPartitioner.class); //設定排序比較器類 job.setSortComparatorClass(TSortComparator.class); //設定combiner類 // job.setCombinerClass(TCombiner.class); //----end Map //----begin Reduce: //設定組比較器的類 job.setGroupingComparatorClass(TGroupingComparator.class); //設定reducer類 job.setReducerClass(TReducer.class); //-----end Reduce: //設定輸入資料的路徑 Path input = new Path("/data/tq/input"); FileInputFormat.addInputPath(job, input); //設定輸出資料的路徑 Path output=new Path("/data/tq/output"); if(output.getFileSystem(conf).exists(output)){ //如果目錄存在遞迴刪除 output.getFileSystem(conf).delete(output,true); } FileOutputFormat.setOutputPath(job, output); //設定reduceTask的數量 和 partitions數量對應 job.setNumReduceTasks(2); //------------------------------------- job.waitForCompletion(true); } }

3.2Tmapper類程式碼

該類繼承Mapper類他的主要作用是對輸入的檔案做一些語處理工作。

package com.hadoop.mr.weather;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
//TextInputFormat.class  --key型別是 longWritable 偏移量  --value是Text型別

public class TMapper extends Mapper<LongWritable, Text, TQ, IntWritable>{
    
    //建立map的 k v 物件
    TQ mkey=new TQ();  // map --->key
    IntWritable mval=new IntWritable(); //map --->value
    
    //重寫map方法
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TQ, IntWritable>.Context context)
            throws IOException, InterruptedException {
        /**
        1949-10-01 14:21:02     34c
        1949-10-01 19:21:02        38c
        1949-10-02 14:01:02        36c
        1950-01-01 11:21:02        32c
        1950-10-01 12:21:02        37c
        **/
        
        
        try {
            String[] strs = StringUtils.split(value.toString(),'\t');//對文字將製表符切分
            
            SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd");
            Date date = sdf.parse(strs[0]);
            Calendar cal= Calendar.getInstance();
            cal.setTime(date);
            
            mkey.setYear(cal.get(Calendar.YEAR));
            mkey.setMonth(cal.get(Calendar.MONTH)+1); //第一個月預設從0開始所以加1
            mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));
            
            int wd = Integer.parseInt(strs[1].substring(0, strs[1].length()-1));//獲取溫度字串並強轉為int型別
            mkey.setWd(wd);
            
            mval.set(wd);
            
            context.write(mkey, mval);
        } catch (ParseException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    
    }
}

3.3TQ類程式碼

該類實現WritableComparable介面他的作用是給生成相關的屬性並重寫 寫入,讀取,比較的方法,

package com.hadoop.mr.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class TQ implements WritableComparable<TQ> {
    
    //定義屬性
    private int year;
    private int month;
    private int day;
    private int wd;  //溫度屬性
    
    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public int getDay() {
        return day;
    }

    public void setDay(int day) {
        this.day = day;
    }

    public int getWd() {
        return wd;
    }

    public void setWd(int wd) {
        this.wd = wd;
    }


    
    
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeInt(month);
        out.writeInt(day);
        out.writeInt(wd);
        
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // TODO Auto-generated method stub
        this.year=in.readInt();
        this.month=in.readInt();
        this.day=in.readInt();
        this.wd=in.readInt();
    }

    @Override
    public int compareTo(TQ that) {
        //compare方法返回值說明the value 0 if x == y; a value less than 0 if x < y; and a value greater than 0 if x > y
        // 日期正序 ,使用這年和那年比較 -.-
        int c1=Integer.compare(this.year, that.getYear());
        // 如果年份相同比較天
        if(c1==0){
            int c2=Integer.compare(this.month, that.getMonth());
            //如果是同一天返回0
            if(c2==0){
                return Integer.compare(this.day, that.getDay());
            }
            return c2;
        }
        
        return 0;
    }

}

3.4Tpartitioner類程式碼

該類的作用,是定義輸出檔案的分佈規則,避免產生資料傾斜

package com.hadoop.mr.weather;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class TPartitioner extends Partitioner<TQ, IntWritable> {
    
    
    //約定成俗規則:避免資料傾斜,將少的資料都放在一個reduce任務組裡,將資料量大的單獨放一個任務組裡。
    @Override
    public int getPartition(TQ key, IntWritable value, int numPartitions) {
        
        return key.hashCode() % numPartitions;
    }

}

3.5TSortComparator類程式碼:

該類的作用是定義一個排序比較器

package com.hadoop.mr.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TSortComparator extends WritableComparator{
    
    public TSortComparator() {
        super(TQ.class,true);
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        
        TQ t1=(TQ) a;
        TQ t2=(TQ) b;
        
        int c1 = Integer.compare(t1.getYear(), t2.getYear());
        if(c1==0){
            
            int c2= Integer.compare(t1.getMonth(), t2.getMonth());
            if(c2==0){
                return -Integer.compare(t1.getWd(), t2.getWd());// -號表示返回溫度的倒序排列
            }
            
        }
        return super.compare(a, b);
    }
}

3.6TGroupingComparator類程式碼:

該類的作用是根據年月兩個維度做分組

package com.hadoop.mr.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TGroupingComparator extends WritableComparator {

    public TGroupingComparator() {
        super(TQ.class,true);
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        
        TQ t1=(TQ) a;
        TQ t2=(TQ) b;
        
        int c1 = Integer.compare(t1.getYear(), t2.getYear());
        if(c1==0){
            return Integer.compare(t1.getMonth(), t2.getMonth()); //返回月份的比較結果來分組
        }
        return c1;
    }
}

3.7TReducer 類程式碼

該類的作用是定義資料的輸出格式和內容

package com.hadoop.mr.weather;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TReducer extends Reducer<TQ, IntWritable, Text, IntWritable>{
    
    Text rkey=new Text();
    IntWritable rval=new IntWritable();
    
    /* (non-Javadoc)
     * @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
     */
    @Override
    protected void reduce(TQ key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        //相同的key為一組。。。。
        //1970 01 01 88   88
        //1970 01 11 78   78
        //1970 01 21 68   68
        //1970 01 01 58   58
        
        int flag=0; //迭代的次數
        int day=0;  
        
        for (IntWritable v : values) {
            if(flag==0){
                //將reduce的key格式化成1970-01-01:88   
                rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()+":"+key.getWd());
                //將reduce的value設定為溫度
                rval.set(key.getWd());
                flag++;
                day=key.getDay();
                context.write(rkey, rval);
            }
            //如果迭代次數不為0且當前的天不等於迭代得到的天就將新的天氣資料賦值給reduce的 kv
            if(flag!=0 && day!=key.getDay()){
                //將reduce的key格式化成1970-01-01:88   
                rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()+":"+key.getWd());
                //將reduce的value設定為溫度
                rval.set(key.getWd());
                context.write(rkey, rval);
                break;
            }
        }
        
        
    }
}

4.執行程式

4.1將包匯出為jar包 上傳至伺服器

 

4.2建立hdfs檔案輸入路徑

hdfs dfs -mkdir -p /data/tq/input

4.3上傳測試檔案到建立的hdfs目錄下

[[email protected] ~]# cat tq.txt
1949-10-01 14:21:02    34c
1949-10-01 19:21:02    38c
1949-10-02 14:01:02    36c
1950-01-01 11:21:02    32c
1950-10-01 12:21:02    37c
1951-12-01 12:21:02    23c
1950-10-02 12:21:02    41c
1950-10-03 12:21:02    27c
1951-07-01 12:21:02    45c
1951-07-02 12:21:02    46c
1951-07-03 12:21:03    47c
 
[[email protected] ~]# hdfs dfs -put tq.txt /data/tq/input

 

 

4.4服務端執行程式

[[email protected] ~]# hadoop jar Myweather.jar com.hadoop.mr.weather.MyWeather
2018-12-29 22:42:01,101 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
2018-12-29 22:42:01,484 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2018-12-29 22:42:01,548 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/root/.staging/job_1546092355023_0004
2018-12-29 22:42:02,025 INFO input.FileInputFormat: Total input files to process : 1
2018-12-29 22:42:02,922 INFO mapreduce.JobSubmitter: number of splits:1
2018-12-29 22:42:02,975 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address
2018-12-29 22:42:02,976 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
2018-12-29 22:42:03,643 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1546092355023_0004
2018-12-29 22:42:03,644 INFO mapreduce.JobSubmitter: Executing with tokens: []
2018-12-29 22:42:03,932 INFO conf.Configuration: resource-types.xml not found
2018-12-29 22:42:03,932 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2018-12-29 22:42:04,012 INFO impl.YarnClientImpl: Submitted application application_1546092355023_0004
2018-12-29 22:42:04,064 INFO mapreduce.Job: The url to track the job: http://node04:8088/proxy/application_1546092355023_0004/
2018-12-29 22:42:04,065 INFO mapreduce.Job: Running job: job_1546092355023_0004
2018-12-29 22:42:13,301 INFO mapreduce.Job: Job job_1546092355023_0004 running in uber mode : false
2018-12-29 22:42:13,302 INFO mapreduce.Job:  map 0% reduce 0%
2018-12-29 22:42:20,490 INFO mapreduce.Job:  map 100% reduce 0%
2018-12-29 22:42:35,850 INFO mapreduce.Job:  map 100% reduce 50%
2018-12-29 22:42:38,877 INFO mapreduce.Job:  map 100% reduce 100%
2018-12-29 22:42:39,899 INFO mapreduce.Job: Job job_1546092355023_0004 completed successfully
2018-12-29 22:42:40,043 INFO mapreduce.Job: Counters: 53
    File System Counters
        FILE: Number of bytes read=254
        FILE: Number of bytes written=653891
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=366
        HDFS: Number of bytes written=141
        HDFS: Number of read operations=13
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=4
    Job Counters
        Launched map tasks=1
        Launched reduce tasks=2
        Rack-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=4437
        Total time spent by all reduces in occupied slots (ms)=29074
        Total time spent by all map tasks (ms)=4437
        Total time spent by all reduce tasks (ms)=29074
        Total vcore-milliseconds taken by all map tasks=4437
        Total vcore-milliseconds taken by all reduce tasks=29074
        Total megabyte-milliseconds taken by all map tasks=4543488
        Total megabyte-milliseconds taken by all reduce tasks=29771776
    Map-Reduce Framework
        Map input records=11
        Map output records=11
        Map output bytes=220
        Map output materialized bytes=254
        Input split bytes=102
        Combine input records=0
        Combine output records=0
        Reduce input groups=6
        Reduce shuffle bytes=254
        Reduce input records=11
        Reduce output records=9
        Spilled Records=22
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=351
        CPU time spent (ms)=1640
        Physical memory (bytes) snapshot=419917824
        Virtual memory (bytes) snapshot=8213352448
        Total committed heap usage (bytes)=164515840
        Peak Map Physical memory (bytes)=206139392
        Peak Map Virtual memory (bytes)=2733309952
        Peak Reduce Physical memory (bytes)=108830720
        Peak Reduce Virtual memory (bytes)=2740228096
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=264
    File Output Format Counters
        Bytes Written=141

 

 

4.5將hdfs上生成的輸出檔案 拉取到本地

[[email protected] ~]# hdfs dfs -get  /data/tq/output/* ./test
 

4.6檢視輸出檔案

[[email protected] test]# ls
part-r-00000  part-r-00001  _SUCCESS
[[email protected] test]# cat part-r-00000
[[email protected] test]# cat part-r-00001
1951-7-3:47    47
1951-7-2:46    46
1950-10-2:41    41
1950-10-3:27    27
1951-12-1:23    23
1950-1-1:32    32
1950-10-1:37    37
1949-10-1:38    38
1949-10-2:36    36
  0分割槽是空的  1分割槽有程式定義的k v輸出。這就發生了資料傾斜,可能上面的Tpartitioner類的程式碼對資料分佈規則定義的不恰當導致的。

5.Combiner說明

由於資料量比較少,這邊沒有對combiner類做擴充套件

每一個map都可能會產生大量的本地輸出Combiner的作用就是對map端的輸出先做一次合併,以減少在map和reduce節點之間的資料傳輸量,以提高網路IO效能,是MapReduce的一種優化手段之一,其具體的作用如下所述。

(1)Combiner最基本是實現本地key的聚合,對map輸出的key排序,value進行迭代。如下所示:

  map: (K1, V1) → list(K2, V2) 
  combine: (K2, list(V2)) → list(K2, V2) 
  reduce: (K2, list(V2)) → list(K3, V3)

 

(2)Combiner還有本地reduce功能(其本質上就是一個reduce),例如Hadoop自帶的wordcount的例子和找出value的最大值的程式,combiner和reduce完全一致,如下所示:

  map: (K1, V1) → list(K2, V2) 
  combine: (K2, list(V2)) → list(K3, V3) 
  reduce: (K3, list(V3)) → list(K4, V4)

如果在wordcount中不用combiner,那麼所有的結果都是reduce完成,效率會相對低下。使用combiner之後,先完成的map會在本地聚合,提升速度。對於hadoop自帶的wordcount的例子,value就是一個疊加的數字,所以map一結束就可以進行reduce的value疊加,而不必要等到所有的map結束再去進行reduce的value疊加。