1. 程式人生 > >案例2-mapreduce統計每年中每個月氣溫排行

案例2-mapreduce統計每年中每個月氣溫排行

如圖所示我們要計算每年中每個月氣溫倒序排行,在這個例子中我們輸入檔案中的年份只有3個,所以例子中的reduceTask個數是3個。如果不確定年份的個數,就不能使用年份維度作為reduceTask個數。

首先,上傳weather檔案到/usr/input下:

執行任務:

分別檢視/usr/output/weather下的三個檔案內容:

1949年:

1950年:

1951年:

上程式碼:

package com.jeff.mr.weather;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 定義Map Task的輸入輸出型別:
 * Mapper<Text, Text, MyKey, DoubleWritable>
 * 
 * @author jeffSheng
 * 2018年9月22日
 */
public class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable> {

	SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//	NullWritable v =NullWritable.get();
	/**
	 * 1949-10-01 14:21:02	34c
	 * 每行第一個分割符(檔案中製表符tab)左邊為key,右邊為value,
	 * key:
	 *    對應下邊的記錄key就是1949-10-01 14:21:02
	 * 		
	 * value:
	 * 		就是34c
	 * 
	 */
	protected void map(Text key, Text value,Context context)
										throws IOException, InterruptedException {
		try {
			//將key即檔案中每行的時間字串轉為日期型別
			Date date =sdf.parse(key.toString());
			//使用Calendar獲取年月
			Calendar c =Calendar.getInstance();
			c.setTime(date);
			int year =c.get(Calendar.YEAR);
			int month =c.get(Calendar.MONTH);
			//拆分value獲得溫度
			double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));
			//建立輸出資料的key,即我們自定義的MyKey
			MyKey k =new MyKey();
			k.setYear(year);
			k.setMonth(month+1);
			k.setHot(hot);
			//輸出資料型別:MyKey, DoubleWritable
			context.write(k, new DoubleWritable(hot));
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}
package com.jeff.mr.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/**
 * 自定義key:
 * 1 實現WritableComparable介面,用於實現序列化和比較兩個key是否相等
 * @author jeffSheng
 * 2018年9月22日
 */
public class MyKey implements WritableComparable<MyKey>{

	private int year;
	
	private int month;
	//溫度
	private double hot;
	
	public int getYear() {
		return year;
	}
	public void setYear(int year) {
		this.year = year;
	}
	public int getMonth() {
		return month;
	}
	public void setMonth(int month) {
		this.month = month;
	}
	public double getHot() {
		return hot;
	}
	public void setHot(double hot) {
		this.hot = hot;
	}
	
	
	
	/**
	 * 判斷物件是否是同一個物件,當該物件作為輸出的key:
	 * 比較規則:依次判斷年月日是否相等
	 */
	@Override
	public int compareTo(MyKey o) {
		int r1 =Integer.compare(this.year, o.getYear());
		if(r1==0){
			int r2 =Integer.compare(this.month, o.getMonth());
			if(r2==0){
				return  Double.compare(this.hot, o.getHot());
			}else{
				return r2;
			}
		}else{
			return r1;
		}
	}
	
	//進行反序列化
	@Override
	public void readFields(DataInput arg0) throws IOException {
		this.year=arg0.readInt();
		this.month=arg0.readInt();
		this.hot=arg0.readDouble();
	}
	
	//進行序列化
	@Override
	public void write(DataOutput arg0) throws IOException {
		arg0.writeInt(year);
		arg0.writeInt(month);
		arg0.writeDouble(hot);
	}
	@Override
	public String toString() {
		return "MyKey [year=" + year + ", month=" + month + ", hot=" + hot
				+ "]";
	}
	
}
package com.jeff.mr.weather;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

/**
 * <MapTask端的分割槽Partition操作>
 * 
 *    使用多個Reduce Task,將每一年的資料分割槽到每一個Reduce task,預設的分割槽演算法HashPartitioner是根據key的hashcode對分割槽數取模,
 * 但是我們的Mykey的hashcode是物件的hashcode不能這麼使用,所以我們自定義分割槽演算法MyPartitioner
 * 
 * Tips:繼承預設的HashPartitioner
 * @author jeffSheng
 * 2018年9月22日
 */
public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{

	/**
	 * 計算輸出資料的分割槽號:
	 *     getPartition這個方法的呼叫時機是:mapTask每當輸出一個數據的時候就會呼叫一次,呼叫頻繁,所以執行時間越短越好。
	 * @param MyKey mapTask輸出的key
	 * @param DoubleWritable mapTask輸出的value
	 * @param numReduceTasks 分割槽數
	 * 
	 * 需求:一年一個reduce分割槽,所以根據年份個數就可以確定分割槽數numReduceTasks
	 * 1949年是年份最小的年,不知道年份直接取模就好
	 */
	public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
		return (key.getYear() - 1949) % numReduceTasks;
	}
}
package com.jeff.mr.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * <MapTask輸出資料進行Sort排序>
 * 
 * MyKey自定義排序
 *  繼承預設的排序方法WritableComparator,演算法是根據key的ASCII碼排序,字典排序。
 * 排序完溢寫到磁碟spill to disk,然後reduceTask端phase抓取資料進行第二次排序,還是呼叫MySort程式,
 * 然後進行分組,我們自定義分組。
 * 
 * @author jeffSheng
 * 2018年9月22日
 */
public class MySort extends WritableComparator{

	//在構造方法中指定比較型別是MyKey並建立MyKey物件
	public MySort(){
		super(MyKey.class,true);
	}
	
	/**
	 * 
	 * 重寫org.apache.hadoop.io.WritableComparator的compare方法,比較排序
	 * 
	 * 需求:比較每一年的每一個月的溫度降序,即年月相同的情況下再比較溫度,溫度降序排列
	 */
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		MyKey k1 =(MyKey) a;
		MyKey k2 =(MyKey) b;
		System.out.println("【比較排序】:"+k1+"------"+k2);
		int r1 =Integer.compare(k1.getYear(), k2.getYear());
		if(r1==0){
			int r2 =Integer.compare(k1.getMonth(), k2.getMonth());
			if(r2==0){
				//降序-,其他情況年月不等
				return -Double.compare(k1.getHot(), k2.getHot());
			}else{
				return r2;
			}
		}else{
			return r1;
		}
		
	}
}
package com.jeff.mr.weather;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 計算每年每月中氣溫最高的前三個溫度
 * 
 * @author jeffSheng
 * 2018年9月22日
 */
public class WeatherReducer  extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{
	
	/**
	 * 需求:輸出每年每月中氣溫最高的前三個溫度
	 */
	protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,
			Context context)
			throws IOException, InterruptedException {
		int i=0;
		/**
		 * @desc 迭代二次排序後分好組的溫度列表
		 * 迴圈呼叫此方法,每組呼叫一次
		 * 	    
		 */
		for(DoubleWritable hot :arg1){
			i++;
			String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + hot.get();
			context.write(new Text(msg), NullWritable.get());
			if(i==3){
				break;
			}
		}
	}
}
package com.jeff.mr.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * <ReduceTask端二次排序後的分組操作>
 * reduceTask端phase抓取資料進行第二次排序,還是呼叫MySort程式,
 * 然後進行分組,我們自定義分組。
 * 
 * @author jeffSheng
 * 2018年9月22日
 */
public class MyGroup extends WritableComparator{

	public MyGroup(){
		super(MyKey.class,true);
	}
	
	/**
	 * 需求:年和月相同則是一組
	 */
	public int compare(WritableComparable a, WritableComparable b) {
		MyKey k1 =(MyKey) a;
		MyKey k2 =(MyKey) b;
		int r1 =Integer.compare(k1.getYear(), k2.getYear());
		if(r1==0){
			return Integer.compare(k1.getMonth(), k2.getMonth());
		}else{
			return r1;
		}
		
	}
}
package com.jeff.mr.weather;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {

	public static void main(String[] args) {
		Configuration config =new Configuration();
		config.set("fs.defaultFS", "hdfs://node1:8020");
		config.set("yarn.resourcemanager.hostname", "node1");
//		config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
		
//可以自定義key和value的分隔符
//		config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
		try {
			FileSystem fs =FileSystem.get(config);
			
			Job job =Job.getInstance(config);
			job.setJarByClass(RunJob.class);
			
			job.setJobName("weather");
			
			job.setMapperClass(WeatherMapper.class);
			job.setReducerClass(WeatherReducer.class);
			job.setMapOutputKeyClass(MyKey.class);
			job.setMapOutputValueClass(DoubleWritable.class);
			
			//設定自定義分割槽器
			job.setPartitionerClass(MyPartitioner.class);
			//設定自定義排序器
			job.setSortComparatorClass(MySort.class);
			//設定自定義分組器
			job.setGroupingComparatorClass(MyGroup.class);
			//設定分割槽個數,預設不寫則是1,例子中有3年,當然了,如果不知道多少年就不應該按照年分割槽,知道那就可以
			job.setNumReduceTasks(3);
			//預設是按照行的下標作為key,設定以下程式碼可以使得key和value分隔符為製表符\t
			job.setInputFormatClass(KeyValueTextInputFormat.class);
			
			FileInputFormat.addInputPath(job, new Path("/usr/input/weather"));
			
			Path outpath =new Path("/usr/output/weather");
			if(fs.exists(outpath)){
				fs.delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job, outpath);
			
			boolean f= job.waitForCompletion(true);
			if(f){
				System.out.println("job任務執行成功");
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	
}

需要注意的是在這個例子中輸入檔案MapTask執行的時候,我們是把時間當做輸入key,把溫度當做輸入value,跟我們之前的把行的下標做輸入key不一樣,需要設定:

         //預設是按照行的下標作為key,設定以下程式碼可以使得key和value分隔符為製表符\t
            job.setInputFormatClass(KeyValueTextInputFormat.class);

如果不想使用製表符作為輸入檔案行的輸入Key和輸入Value之間的分隔符,可以自定義比如逗號:

//可以自定義key和value的分隔符
config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");