18-hadoop-weather案例
阿新 • • 發佈:2017-08-08
ping ide exc 所有 void 每年 [] exce framework
weather案例, 簡單分析每年的前三個月的最高溫即可, 使用自定義的分組和排序
1, MyKey,
因為對溫度進行分組, 排序, pardition操作, 所以默認的字典順序不能滿足需求
package com.wenbronk.weather; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 自定義key, 對key進行分組 * 實現writableComparble方法, 可序列化並比較是否同一個對象 * @author root **/ public class MyKey implements WritableComparable<MyKey> { private int year; private int month; private double hot; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { returnmonth; } public void setMonth(int month) { this.month = month; } public double getHot() { return hot; } public void setHot(double hot) { this.hot = hot; } /** * 反序列化 */ @Override public void readFields(DataInput arg0) throws IOException {this.year = arg0.readInt(); this.month = arg0.readInt(); this.hot = arg0.readDouble(); } /** * 序列化 */ @Override public void write(DataOutput arg0) throws IOException { arg0.writeInt(year); arg0.writeInt(month); arg0.writeDouble(hot); } /** * 比較, 判斷是否同一個對象, 當對象作為key時 */ @Override public int compareTo(MyKey o) { int c1 = Integer.compare(this.year, o.getYear()); if (c1 == 0) { int c2 = Integer.compare(this.month, o.getMonth()); if (c2 == 0) { return Double.compare(this.hot, o.getHot()); } } return 1; } }
2, sort
package com.wenbronk.weather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定義排序 * @author root */ public class MySort extends WritableComparator { /** * 在構造方法中, 通過調用父類構造創建MyKey * MyKey.class : 比較的對象 * true : 創建這個對象 */ public MySort() { super(MyKey.class, true); } /** * 自定義排序方法 * 傳入的比較對象為 map 輸出的key * * 年相同比較月, 月相同, 溫度降序 */ @Override public int compare(WritableComparable a, WritableComparable b) { MyKey key1 = (MyKey) a; MyKey key2 = (MyKey) b; int r1 = Integer.compare(key1.getYear(), key2.getYear()); if (r1 == 0) { int r2 = Integer.compare(key1.getMonth(), key2.getMonth()); if (r2 == 0) { // 溫度降序 return - Double.compare(key1.getHot(), key2.getHot()); }else { return r2; } } return r1; } }
3, group
package com.wenbronk.weather; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; /** * 自定義分組 * @author root * */ public class MyGroup extends WritableComparator { public MyGroup() { super(MyKey.class, true); } /** * 年, 月相同, 則為一組 */ @Override public int compare(WritableComparable a, WritableComparable b) { MyKey key1 = (MyKey) a; MyKey key2 = (MyKey) b; int r1 = Integer.compare(key1.getYear(), key2.getYear()); if (r1 == 0) { return Integer.compare(key1.getMonth(), key2.getMonth()); } return r1; } }
4, parditon
package com.wenbronk.weather; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; /** * 自定義partition, 保證一年一個reducer進行處理 * 從map接收值 * @author root * */ public class MyPartition extends HashPartitioner<MyKey, DoubleWritable> { /** * maptask每輸出一個數據, 調用一次此方法 * 執行時間越短越好 * 年的數量是確定的, 可以傳遞reduceTask數量, 在配置文件可設置, 在程序執行時也可設置 * */ @Override public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) { // 減去最小的, 更精確 return (key.getYear() - 1949) % numReduceTasks; } }
5, 執行類
package com.wenbronk.weather; import java.io.IOException; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 執行mapreduce 統計每年溫度的前三個 * * @author wenbronk * */ public class RunMapReduce { public static void main(String[] args) throws Exception { // 初始化時加載src或classpath下所有的配置文件 Configuration configuration = new Configuration(); // 本地執行 configuration.set("fs.default", "hdfs://wenbronk.hdfs.com:8020 "); configuration.set("yarn.resourcemanager", "hdfs://192.168.208.106"); // 服務器執行 // configuration.set("mapred.jar", "?C:/Users/wenbr/Desktop/weather.jar"); // configuration.set("mapred.jar", "E:\\sxt\\target\\weather.jar"); // configuration.set("mapreduce.app-submission.cross-platform", "true"); // // configuration.set("mapreduce.framework.name", "yarn"); // configuration.set("yarn.resourcemanager.address", "192.168.208.106:"+8030); // configuration.set("yarn.resourcemanager.scheduler.address", "192.168.208.106:"+8032); // 得到執行的任務 Job job = Job.getInstance(); // 入口類 job.setJarByClass(RunMapReduce.class); // job名字 job.setJobName("weather"); // job執行是map執行的類 job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReduce.class); job.setMapOutputKeyClass(MyKey.class); job.setMapOutputValueClass(DoubleWritable.class); // 使用自定義的排序, 分組 job.setPartitionerClass(MyPartition.class); job.setSortComparatorClass(MySort.class); job.setGroupingComparatorClass(MyGroup.class); // job.setJar("E:\\sxt\\target\\weather.jar"); //設置 分區數量 job.setNumReduceTasks(3); // **** 使用插件上傳data.txt到hdfs/root/usr/data.txt //****使得左邊為key, 右邊為value, 此類默認為 "\t" 可以自定義 // 或者 config.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t"); job.setInputFormatClass(KeyValueTextInputFormat.class); // 使用文件 FileInputFormat.addInputPath(job, new Path("E:\\sxt\\1-MapReduce\\data\\weather.txt")); // FileInputFormat.addInputPath(job, new Path("/root/usr/weather.txt")); // 使用一個不存在的目錄進行 Path path = new Path("/root/usr/weather"); // 如果存在刪除 FileSystem fs = FileSystem.get(configuration); if (fs.exists(path)) { fs.delete(path, true); } // 輸出 FileOutputFormat.setOutputPath(job, path); boolean forCompletion = job.waitForCompletion(true); if (forCompletion) { System.out.println("success"); } } /** * key: 將 LongWritalbe 改成 Text類型的 * * 將輸入更改為需要的 key, value, mapper所做的事情 * * @author wenbronk */ static class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable> { /** * 轉換字符串為日期對象 */ DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** * 將鍵值取出來, 封裝為key 每行第一個分隔符"\t"左側為key, 右側有value, 傳遞過來的數據已經切割好了 */ @Override protected void map(Text key, Text value, Mapper<Text, Text, MyKey, DoubleWritable>.Context context) throws IOException, InterruptedException { try { Date date = formatter.parse(key.toString()); Calendar calendar = Calendar.getInstance(); calendar.setTime(date); int year = calendar.get(Calendar.YEAR); int month = calendar.get(Calendar.MONTH); double hot = Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c"))); MyKey mykey = new MyKey(); mykey.setYear(year); mykey.setMonth(month); mykey.setHot(hot); context.write(mykey, new DoubleWritable(hot)); } catch (ParseException e) { e.printStackTrace(); } } } /** * 經過partition, 分組, 排序, 傳遞數據給reducer 需要自定義partition, 保證一年一個reduce 自定義排序, * 保證按照年, 月, 溫度 自定義分組, 年月相同, 一個組 * 傳進來的溫度, 為已經排好序的 * @author root */ static class WeatherReduce extends Reducer<MyKey, DoubleWritable, Text, NullWritable> { NullWritable nullWritable = NullWritable.get(); @Override protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1, Reducer<MyKey, DoubleWritable, Text, NullWritable>.Context arg2) throws IOException, InterruptedException { int i = 0; for (DoubleWritable doubleWritable : arg1) { i++; String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + doubleWritable.get(); // key中已經包含需要的結果了 arg2.write(new Text(msg), NullWritable.get()); // 每個月的前三個 if (i == 3) { break; } } } } }
初始文檔
1949-10-01 14:21:02 34c 1949-10-02 14:01:02 36c 1950-01-01 11:21:02 32c 1950-10-01 12:21:02 37c 1951-12-01 12:21:02 23c 1950-10-02 12:21:02 41c 1950-10-03 12:21:02 27c 1951-07-01 12:21:02 45c 1951-07-02 12:21:02 46c 1951-07-03 12:21:03 47c
系列來自尚學堂視頻
18-hadoop-weather案例