MapReduce之概要模式2
阿新 • • 發佈:2019-02-13
求平均數和使用MapReduce的自帶的計數方法
求平均數主要是是用了Combiner,然後在mapper端統計了匹配和不匹配的數目,最後在控制檯輸出
Combiner主要是在map階段結束以後,對每個分割槽來說,可以分為不同的組,某個組有幾條資料走combiner方法,卻不影響最終的統計結果,這叫做滿足結合律和交換律
1、自定義Writable
public class AverageTuple implements Writable { private long count; private double averageTemp; public long getCount() { return count; } public double getAverageTemp() { return averageTemp; } public void setCount(long count) { this.count = count; } public void setAverageTemp(double averageTemp) { this.averageTemp = averageTemp; } public void write(DataOutput dataOutput) throws IOException { dataOutput.writeLong(count); dataOutput.writeDouble(averageTemp); } public void readFields(DataInput dataInput) throws IOException { count=dataInput.readLong(); averageTemp=dataInput.readDouble(); } @Override public String toString() { return "AverageCompute.AverageTuple{" + "count=" + count + ", averageTemp=" + averageTemp + '}'; } }
2、mapper方法
在mapper端統計計數
context.getCounter("Total","not-match").increment(1);解釋:命名為Total組下的not-match有幾個
public class AverageMapper extends Mapper<LongWritable,Text,Text,AverageTuple> { Text mapKey=new Text(); AverageTuple averageTuple=new AverageTuple(); public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { mapKey.set(value.toString().split(" ")[0]); averageTuple.setCount(1); averageTuple.setAverageTemp(Double.parseDouble(value.toString().split(" ")[1])); context.getCounter("Total","not-match").increment(1); context.write(mapKey,averageTuple); } }
3、reducer
public class AverageReducer extends Reducer<Text,AverageTuple,Text,AverageTuple> { Text uid= new Text(); AverageTuple reduceValue=new AverageTuple(); public void reduce(Text key,Iterable<AverageTuple> values,Context context) throws IOException, InterruptedException { int count=0; int sum=0; for (AverageTuple average:values){ count+=average.getCount(); sum+=average.getAverageTemp()*average.getCount(); } reduceValue.setCount(count); reduceValue.setAverageTemp(sum/count); uid.set(key); context.write(uid,reduceValue); } }
4、main方法呼叫reducer為Combiner函式
job.setCombinerClass(AverageReducer.class);
public class AverageMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//構建job物件
Job job = Job.getInstance(new Configuration());
//注意:main方法所在的類
job.setJarByClass(AverageMain.class);
//設定mapper相關屬性
job.setMapperClass(AverageMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(AverageTuple.class);
// FileInputFormat.setInputPaths(job, new Path("D:words.txt"));
//設定reducer相關屬性
job.setReducerClass(AverageReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(AverageTuple.class);
job.setCombinerClass(AverageReducer.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// FileOutputFormat.setOutputPath(job, new Path("D:wcout510"));
//提交任務
int code=job.waitForCompletion(true)?0:1;
if(code==0){
for(Counter counter:job.getCounters().getGroup("Total")){
System.out.println(counter.getDisplayName()+"是"+counter.getValue());
}
}
}
}