1. 程式人生 > >MapReduce之概要模式2

MapReduce之概要模式2

求平均數和使用MapReduce的自帶的計數方法

求平均數主要是是用了Combiner,然後在mapper端統計了匹配和不匹配的數目,最後在控制檯輸出

Combiner主要是在map階段結束以後,對每個分割槽來說,可以分為不同的組,某個組有幾條資料走combiner方法,卻不影響最終的統計結果,這叫做滿足結合律和交換律

1、自定義Writable

public class AverageTuple implements Writable {
    private long count;
    private double averageTemp;


    public long getCount() {
        return count;
    }

    public double getAverageTemp() {
        return averageTemp;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public void setAverageTemp(double averageTemp) {
        this.averageTemp = averageTemp;
    }

    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(count);
        dataOutput.writeDouble(averageTemp);
    }

    public void readFields(DataInput dataInput) throws IOException {
        count=dataInput.readLong();
        averageTemp=dataInput.readDouble();
    }

    @Override
    public String toString() {
        return "AverageCompute.AverageTuple{" +
                "count=" + count +
                ", averageTemp=" + averageTemp +
                '}';
    }
}

2、mapper方法

在mapper端統計計數

context.getCounter("Total","not-match").increment(1);解釋:命名為Total組下的not-match有幾個
public class AverageMapper extends Mapper<LongWritable,Text,Text,AverageTuple> {

    Text mapKey=new Text();
    AverageTuple averageTuple=new AverageTuple();

    public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {
        mapKey.set(value.toString().split(" ")[0]);
        averageTuple.setCount(1);
        averageTuple.setAverageTemp(Double.parseDouble(value.toString().split(" ")[1]));
        context.getCounter("Total","not-match").increment(1);
        context.write(mapKey,averageTuple);
    }
}

3、reducer

public class AverageReducer extends Reducer<Text,AverageTuple,Text,AverageTuple> {

    Text uid= new Text();
    AverageTuple reduceValue=new AverageTuple();

    public void reduce(Text key,Iterable<AverageTuple> values,Context context) throws IOException, InterruptedException {
        int count=0;
        int sum=0;
        for (AverageTuple average:values){
            count+=average.getCount();
            sum+=average.getAverageTemp()*average.getCount();
        }
        reduceValue.setCount(count);
        reduceValue.setAverageTemp(sum/count);
        uid.set(key);
        context.write(uid,reduceValue);
    }

}

4、main方法呼叫reducer為Combiner函式

job.setCombinerClass(AverageReducer.class);
public class AverageMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //構建job物件
        Job job = Job.getInstance(new Configuration());

        //注意:main方法所在的類
        job.setJarByClass(AverageMain.class);

        //設定mapper相關屬性
        job.setMapperClass(AverageMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(AverageTuple.class);
//        FileInputFormat.setInputPaths(job, new Path("D:words.txt"));

        //設定reducer相關屬性
        job.setReducerClass(AverageReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(AverageTuple.class);
        job.setCombinerClass(AverageReducer.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

//        FileOutputFormat.setOutputPath(job, new Path("D:wcout510"));


        //提交任務
        int code=job.waitForCompletion(true)?0:1;
        if(code==0){
            for(Counter counter:job.getCounters().getGroup("Total")){
                System.out.println(counter.getDisplayName()+"是"+counter.getValue());
            }
        }

    }

}