1. 程式人生 > >Hadoop實戰-MapReduce之max、min、avg統計(六)

Hadoop實戰-MapReduce之max、min、avg統計(六)

next combine output fileinput private pub eof pri use

1、數據準備:

Mike,35

Steven,40

Ken,28

Cindy,32

2、預期結果

Max  40

Min   28

Avg 33

3、MapReduce代碼如下

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class AgeMapReduce { public
static class WordCountMapper extends Mapper<Object, Text, Text, Text> { private Text nameKey = new Text(); private Text ageValue = new Text(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr
= new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { String content = itr.nextToken(); String[] nameAndAge = content.split(","); //String name = nameAndAge[0]; String age = nameAndAge[1]; nameKey.set("only you"); ageValue.set(age); context.write(nameKey, ageValue); } } } public static class WordCountReduce extends Reducer<Text, Text, Text, Text> { private int min = Integer.MAX_VALUE; private int max = 0; private int sum = 0; private int count = 0; @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text tmpAge : values) { int age = Integer.valueOf(tmpAge.toString()); if (age < min) { min = age; } if (age > max) { max = age; } sum += age; count++; } //String resultStr = min + "\t" + max + "\t" + (sum / count); //result.set(resultStr); context.write(new Text("Max"), new Text(String.valueOf(min))); context.write(new Text("Min"), new Text(String.valueOf(max))); context.write(new Text("Avg"), new Text(String.valueOf(sum/count))); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: MinMaxCountDriver <in> <out>"); System.exit(2); } Job job = new Job(conf, "StackOverflow Comment Date Min Max Count"); job.setJarByClass(AgeMapReduce.class); job.setMapperClass(WordCountMapper.class); // job.setCombinerClass(MusicReduce.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // user/joe/wordcount/input FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // user/joe/wordcount/output FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }


4、註意事項

因為輸出的結果和Key沒有關系,所以在map階段要固定一個Key即可。

Hadoop實戰-MapReduce之max、min、avg統計(六)