MR模式是模擬實現天氣資料獲取兩次溫度
阿新 • • 發佈:2018-12-20
資料
1949-10-01 14:21:02 34c
1949-10-01 19:21:02 38c
1949-10-02 14:01:02 36c
1950-01-01 11:21:02 32c
1950-10-01 12:21:02 37c
1951-12-01 12:21:02 23c
1950-10-02 12:21:02 41c
1950-10-03 12:21:02 27c
1951-07-01 12:21:02 45c
1951-07-02 12:21:02 46c
1951-07-03 12:21:03 47c
主程式
package com.zyd.tq; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class TQRunner { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1 獲取檔案系統 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(TQRunner.class); job.setJobName("TQ"); //讀 FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //map操作 job.setMapperClass(TQMapper.class); //Reduce操作 job.setReducerClass(TQReducer.class); //map的輸出的key和value都是Text job.setMapOutputKeyClass(Text.class); //如果key不對 啟動任務時候報unable to initialize any output collect job.setMapOutputValueClass(Text.class); job.waitForCompletion(true); } }
map階段程式碼:
package com.zyd.tq; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * * @author Administrator *傳入輸入的key value 輸入的key value *key 精確到年月 value是度數 是字串 */ public class TQMapper extends Mapper<Object, Text, Text, Text>{ @Override /** * 重寫map方法 * */ protected void map(Object key, Text value, Mapper<Object, Text, Text, Text>.Context context) throws IOException, InterruptedException { //時間和溫度是按製表符隔開的 String[] split = value.toString().split("\t"); //時間 String time = split[0]; //溫度 String wd = split[1]; context.write(new Text(time.substring(0,7)),new Text(wd)); } }
Reduce階段:
package com.zyd.tq; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * * @author Administrator *根據map分割槽後 <text,text> */ public class TQReducer extends Reducer<Text, Text, Text, Text> { @Override /** * 相同的key為一組 * * iterable是同一個月裡面所有的溫度,是所有的map直接拉過來以後的 */ protected void reduce(Text key, Iterable<Text> iterable, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { ArrayList<String> list = new ArrayList<String>(); for(Text text : iterable){ list.add(text.toString()); } //按照字典排序 Collections.sort(list); //得到最高的兩個溫度 即下標的最後兩個 String maxWD = list.get(list.size()-1); String tmp = ""; if (list.size()>=2) { String secondWD = list.get(list.size()-2); tmp = ":"+secondWD; } context.write(key,new Text(maxWD+tmp)); } }