MapReduce WordCount 練習max avg
阿新 • • 發佈:2018-12-22
max計算
需求:輸出每天最高溫度的日期及溫度
資料:
20170931 20.1
20170930 30.6
20170931 30.6
20170929 30.02
20170928 10.3
20170928 30.3
20170927 28.3
20170931 28.1
java程式碼:
max
package com.beicai.wc1; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.chainsaw.Main; //主類 public class Demo1Max { //mapper實現類 public static class MyMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(" "); String date = split[0]; // double temp = Double.parseDouble(split[1]); double temp = Double.valueOf(split[1]); context.write(new Text(date), new DoubleWritable(temp)); } } //reduce實現類 public static class MyReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> value, Context context) throws IOException, InterruptedException { // key=date // value=list(10.1,30.6,12.5) double max = Double.MIN_VALUE; for (DoubleWritable v : value) { double temp = v.get(); if (temp > max) { max = temp; } } context.write(key, new DoubleWritable(max)); } } //驅動類 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Demo1Max.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); int status=job.waitForCompletion(true)?0:-1; System.exit(status); } }
avg
package com.beicai.wc1; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.log4j.chainsaw.Main; public class Demo1Avg { public static class MyMapper extends Mapper<LongWritable, Text, Text, DoubleWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(" "); String date = split[0]; // double temp = Double.parseDouble(split[1]); double temp = Double.valueOf(split[1]); context.write(new Text(date), new DoubleWritable(temp)); } } public static class MyReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> { @Override protected void reduce(Text key, Iterable<DoubleWritable> value, Context context) throws IOException, InterruptedException { // key=date // value=list(10.1,30.6,12.5) double sum=0.0; int count=0; for (DoubleWritable v : value) { sum+=v.get(); count++; } context.write(key, new DoubleWritable(sum/count)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(Demo1Avg.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); int status=job.waitForCompletion(true)?0:-1; System.exit(status); } }
上傳max.jar到linux
上傳測試檔案到in中 (max.txt): hdfs dfs -put max.txt /usr/hello/in
輸入輸出: hadoop jar ./max.jar /usr/hello/in/max.txt /usr/hello/in/201 //路徑不能重複(存在)
Or: yarn jar ./max.jar /usr/hello/in/max.txt /usr/hello/in/201 //路徑不能重複(存在)
50070檢視 usr/hello/in/201/part-r-00000
linux檢視: hdfs dfs -cat /usr/hello/in/201/part-r-00000