【尚學堂·Hadoop學習】MapReduce案例1--天氣
阿新 • • 發佈:2019-04-02
png font base64 sys srx ner soft alt 時間
案例描述
找出每個月氣溫最高的2天
數據集
1949-10-01 14:21:02 34c 1949-10-01 19:21:02 38c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c 1950-10-02 12:21:02 41c 1950-10-03 12:21:02 27c 1951-07-01 12:21:02 45c 1951-07-02 12:21:02 46c 1951-07-03 12:21:03 47c
代碼
MyTQ.class
package com.hadoop.mr.tq; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 客戶端 * @author Lindsey * */ public class MyTQ { public static void main(String args []) throws Exception{ //加載配置文件 Configuration conf = new Configuration(true); //創建客戶端 Job job = Job.getInstance(conf); job.setJarByClass(MyTQ.class); //Map配置 job.setMapperClass(TMapper.class); job.setMapOutputKeyClass(Tq.class); job.setMapOutputValueClass(IntWritable.class); //分區類:處理大數據量均衡並發處理 job.setPartitionerClass(TPartitioner.class); //比較類:用buffer字節數組內的key排序 job.setSortComparatorClass(TSortComparator.class); //Reduce配置 job.setNumReduceTasks(2); job.setReducerClass(TReducer.class); //分組比較類:年月相同為一組 job.setGroupingComparatorClass(TGroupingComparator.class); //輸入輸出源 Path input = new Path("/user/hadoop/input/weather.txt"); FileInputFormat.addInputPath(job, input); Path output = new Path("/user/hadoop/output/weather"); if(output.getFileSystem(conf).exists(output)){ output.getFileSystem(conf).delete(output,true); } FileOutputFormat.setOutputPath(job, output); //提交 job.waitForCompletion(true); } }
TMapper.class
package com.hadoop.mr.tq; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; public class TMapper extends Mapper<LongWritable, Text, Tq,IntWritable>{ /* * k-v 映射 * K(Tq) V(IntWritable) * 1949-10-01 14:21:02 34c * */ Tq mkey = new Tq(); IntWritable mval =new IntWritable(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { try { //字符串分割 String [] strs = StringUtils.split(value.toString(),‘\t‘); //設置時間格式 註意月份是大寫! SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); //解析為Date格式 Date date = sdf.parse(strs[0]); //日歷上設置時間 Calendar cal = Calendar.getInstance(); cal.setTime(date); //Key mkey.setYear(cal.get(Calendar.YEAR)); mkey.setMonth(cal.get(Calendar.MONTH)+1); mkey.setDay(cal.get(Calendar.DAY_OF_MONTH)); int temperture = Integer.parseInt(strs[1].substring(0,strs[1].length()-1)); mkey.setTemperature(temperture); //value mval.set(temperture); //輸出 context.write(mkey, mval); } catch (ParseException e) { e.printStackTrace(); } } }
Tq.class
package com.hadoop.mr.tq; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class Tq implements WritableComparable<Tq>{ private int year; private int month; private int day; private int temperature; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getDay() { return day; } public void setDay(int day) { this.day = day; } public int getTemperature() { return temperature; } public void setTemperature(int temperature) { this.temperature = temperature; } @Override public void readFields(DataInput in) throws IOException { this.year=in.readInt(); this.month=in.readInt(); this.day=in.readInt(); this.temperature=in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(day); out.writeInt(temperature); } @Override public int compareTo(Tq that) { //約定:日期正序 int y = Integer.compare(this.year,that.getYear()); if(y == 0){ //年份相同 int m = Integer.compare(this.month,that.getMonth()); if(m == 0){ //月份相同 return Integer.compare(this.day,that.getDay()); } return m; } return y; } }
TPartitioner.class
package com.hadoop.mr.tq; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * 分區規則設計 使數據分區均衡避免傾斜 * @author Lindsey * */ public class TPartitioner extends Partitioner<Tq,IntWritable>{ @Override public int getPartition(Tq key, IntWritable value, int numPartitions) { return key.getYear() % numPartitions; } }
TSortComparator.class
package com.hadoop.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TSortComparator extends WritableComparator{ //對字節數據中map排序 需要先將Key反序列化為對象再比較 public TSortComparator(){ super(Tq.class,true); //true是將Tq實例化 } /* 時間正序 、溫度倒序 */ @Override public int compare(WritableComparable a, WritableComparable b) { Tq t1 = (Tq) a; Tq t2 = (Tq) b; int y = Integer.compare(t1.getYear(),t2.getYear()); if(y == 0){ int m = Integer.compare(t1.getMonth(),t2.getMonth()); if(m == 0){ //加上負號實現倒序 return -Integer.compare(t1.getTemperature(),t2.getTemperature()); } return m; } return y; } }
TReducer.class
package com.hadoop.mr.tq; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.shaded.org.glassfish.grizzly.compression.lzma.impl.lz.InWindow; public class TReducer extends Reducer<Tq, IntWritable, Text,IntWritable>{ Text rkey = new Text(); IntWritable rval = new IntWritable(); /* * 相同的Key為一組:Tq */ @Override protected void reduce(Tq key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int flg = 0; //標誌,表示是否已經取了當天的天氣 int day = 0; for(IntWritable v:values){ if(flg == 0){ day = key.getDay(); //設置文本內容 yyyy-mm-dd:temperture rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rval.set(key.getTemperature()); flg++; context.write(rkey, rval); } if(flg!=0 && day!=key.getDay()){ rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()); rval.set(key.getTemperature()); context.write(rkey, rval); break; } } } }
TGroupingComparator.class
package com.hadoop.mr.tq; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class TGroupingComparator extends WritableComparator{ public TGroupingComparator() { super(Tq.class,true); } /* * 面向Reduce * 年月相同為一組 返回0表示為同一組 */ @Override public int compare(WritableComparable a, WritableComparable b) { Tq t1 = (Tq) a; Tq t2 = (Tq) b; int y = Integer.compare(t1.getYear(),t2.getYear()); if(y == 0){ return Integer.compare(t1.getMonth(),t2.getMonth()); } return y; } }
運行結果
part-r-00000
part-r-00001
【尚學堂·Hadoop學習】MapReduce案例1--天氣