案例2-mapreduce統計每年中每個月氣溫排行
阿新 • • 發佈:2018-12-11
如圖所示我們要計算每年中每個月氣溫倒序排行,在這個例子中我們輸入檔案中的年份只有3個,所以例子中的reduceTask個數是3個。如果不確定年份的個數,就不能使用年份維度作為reduceTask個數。
首先,上傳weather檔案到/usr/input下:
執行任務:
分別檢視/usr/output/weather下的三個檔案內容:
1949年:
1950年:
1951年:
上程式碼:
package com.jeff.mr.weather; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 定義Map Task的輸入輸出型別: * Mapper<Text, Text, MyKey, DoubleWritable> * * @author jeffSheng * 2018年9月22日 */ public class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable> { SimpleDateFormat sdf =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // NullWritable v =NullWritable.get(); /** * 1949-10-01 14:21:02 34c * 每行第一個分割符(檔案中製表符tab)左邊為key,右邊為value, * key: * 對應下邊的記錄key就是1949-10-01 14:21:02 * * value: * 就是34c * */ protected void map(Text key, Text value,Context context) throws IOException, InterruptedException { try { //將key即檔案中每行的時間字串轉為日期型別 Date date =sdf.parse(key.toString()); //使用Calendar獲取年月 Calendar c =Calendar.getInstance(); c.setTime(date); int year =c.get(Calendar.YEAR); int month =c.get(Calendar.MONTH); //拆分value獲得溫度 double hot =Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c"))); //建立輸出資料的key,即我們自定義的MyKey MyKey k =new MyKey(); k.setYear(year); k.setMonth(month+1); k.setHot(hot); //輸出資料型別:MyKey, DoubleWritable context.write(k, new DoubleWritable(hot)); } catch (Exception e) { e.printStackTrace(); } } }
package com.jeff.mr.weather; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 自定義key: * 1 實現WritableComparable介面,用於實現序列化和比較兩個key是否相等 * @author jeffSheng * 2018年9月22日 */ public class MyKey implements WritableComparable<MyKey>{ private int year; private int month; //溫度 private double hot; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public double getHot() { return hot; } public void setHot(double hot) { this.hot = hot; } /** * 判斷物件是否是同一個物件,當該物件作為輸出的key: * 比較規則:依次判斷年月日是否相等 */ @Override public int compareTo(MyKey o) { int r1 =Integer.compare(this.year, o.getYear()); if(r1==0){ int r2 =Integer.compare(this.month, o.getMonth()); if(r2==0){ return Double.compare(this.hot, o.getHot()); }else{ return r2; } }else{ return r1; } } //進行反序列化 @Override public void readFields(DataInput arg0) throws IOException { this.year=arg0.readInt(); this.month=arg0.readInt(); this.hot=arg0.readDouble(); } //進行序列化 @Override public void write(DataOutput arg0) throws IOException { arg0.writeInt(year); arg0.writeInt(month); arg0.writeDouble(hot); } @Override public String toString() { return "MyKey [year=" + year + ", month=" + month + ", hot=" + hot + "]"; } }
package com.jeff.mr.weather; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /** * <MapTask端的分割槽Partition操作> * * 使用多個Reduce Task,將每一年的資料分割槽到每一個Reduce task,預設的分割槽演算法HashPartitioner是根據key的hashcode對分割槽數取模, * 但是我們的Mykey的hashcode是物件的hashcode不能這麼使用,所以我們自定義分割槽演算法MyPartitioner * * Tips:繼承預設的HashPartitioner * @author jeffSheng * 2018年9月22日 */ public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{ /** * 計算輸出資料的分割槽號: * getPartition這個方法的呼叫時機是:mapTask每當輸出一個數據的時候就會呼叫一次,呼叫頻繁,所以執行時間越短越好。 * @param MyKey mapTask輸出的key * @param DoubleWritable mapTask輸出的value * @param numReduceTasks 分割槽數 * * 需求:一年一個reduce分割槽,所以根據年份個數就可以確定分割槽數numReduceTasks * 1949年是年份最小的年,不知道年份直接取模就好 */ public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) { return (key.getYear() - 1949) % numReduceTasks; } }
package com.jeff.mr.weather;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* <MapTask輸出資料進行Sort排序>
*
* MyKey自定義排序
* 繼承預設的排序方法WritableComparator,演算法是根據key的ASCII碼排序,字典排序。
* 排序完溢寫到磁碟spill to disk,然後reduceTask端phase抓取資料進行第二次排序,還是呼叫MySort程式,
* 然後進行分組,我們自定義分組。
*
* @author jeffSheng
* 2018年9月22日
*/
public class MySort extends WritableComparator{
//在構造方法中指定比較型別是MyKey並建立MyKey物件
public MySort(){
super(MyKey.class,true);
}
/**
*
* 重寫org.apache.hadoop.io.WritableComparator的compare方法,比較排序
*
* 需求:比較每一年的每一個月的溫度降序,即年月相同的情況下再比較溫度,溫度降序排列
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
System.out.println("【比較排序】:"+k1+"------"+k2);
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==0){
int r2 =Integer.compare(k1.getMonth(), k2.getMonth());
if(r2==0){
//降序-,其他情況年月不等
return -Double.compare(k1.getHot(), k2.getHot());
}else{
return r2;
}
}else{
return r1;
}
}
}
package com.jeff.mr.weather;
import java.io.IOException;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 計算每年每月中氣溫最高的前三個溫度
*
* @author jeffSheng
* 2018年9月22日
*/
public class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{
/**
* 需求:輸出每年每月中氣溫最高的前三個溫度
*/
protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,
Context context)
throws IOException, InterruptedException {
int i=0;
/**
* @desc 迭代二次排序後分好組的溫度列表
* 迴圈呼叫此方法,每組呼叫一次
*
*/
for(DoubleWritable hot :arg1){
i++;
String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + hot.get();
context.write(new Text(msg), NullWritable.get());
if(i==3){
break;
}
}
}
}
package com.jeff.mr.weather;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* <ReduceTask端二次排序後的分組操作>
* reduceTask端phase抓取資料進行第二次排序,還是呼叫MySort程式,
* 然後進行分組,我們自定義分組。
*
* @author jeffSheng
* 2018年9月22日
*/
public class MyGroup extends WritableComparator{
public MyGroup(){
super(MyKey.class,true);
}
/**
* 需求:年和月相同則是一組
*/
public int compare(WritableComparable a, WritableComparable b) {
MyKey k1 =(MyKey) a;
MyKey k2 =(MyKey) b;
int r1 =Integer.compare(k1.getYear(), k2.getYear());
if(r1==0){
return Integer.compare(k1.getMonth(), k2.getMonth());
}else{
return r1;
}
}
}
package com.jeff.mr.weather;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class RunJob {
public static void main(String[] args) {
Configuration config =new Configuration();
config.set("fs.defaultFS", "hdfs://node1:8020");
config.set("yarn.resourcemanager.hostname", "node1");
// config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
//可以自定義key和value的分隔符
// config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");
try {
FileSystem fs =FileSystem.get(config);
Job job =Job.getInstance(config);
job.setJarByClass(RunJob.class);
job.setJobName("weather");
job.setMapperClass(WeatherMapper.class);
job.setReducerClass(WeatherReducer.class);
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(DoubleWritable.class);
//設定自定義分割槽器
job.setPartitionerClass(MyPartitioner.class);
//設定自定義排序器
job.setSortComparatorClass(MySort.class);
//設定自定義分組器
job.setGroupingComparatorClass(MyGroup.class);
//設定分割槽個數,預設不寫則是1,例子中有3年,當然了,如果不知道多少年就不應該按照年分割槽,知道那就可以
job.setNumReduceTasks(3);
//預設是按照行的下標作為key,設定以下程式碼可以使得key和value分隔符為製表符\t
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path("/usr/input/weather"));
Path outpath =new Path("/usr/output/weather");
if(fs.exists(outpath)){
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
boolean f= job.waitForCompletion(true);
if(f){
System.out.println("job任務執行成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
需要注意的是在這個例子中輸入檔案MapTask執行的時候,我們是把時間當做輸入key,把溫度當做輸入value,跟我們之前的把行的下標做輸入key不一樣,需要設定:
//預設是按照行的下標作為key,設定以下程式碼可以使得key和value分隔符為製表符\t
job.setInputFormatClass(KeyValueTextInputFormat.class);
如果不想使用製表符作為輸入檔案行的輸入Key和輸入Value之間的分隔符,可以自定義比如逗號:
//可以自定義key和value的分隔符
config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", ",");