1. 程式人生 > >MapReduce中的排序和計數器

MapReduce中的排序和計數器

一:條件準備

準備sort.txt文字

a 1

a 9

b 3

a 7

b 8

b 10

a 5

a 9

排序後輸出的文字:

a 1

a 5

a 7

a 9

a 9

b 3

b 8

b 10

二:排序介面WritableComparable

思路:將文字內容轉為一個sortBean,將此bean作為k2,使用NullWritable作為v2

sortBean實現WritableComparable介面,重寫compareTo方法,指定排序

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * Writable:序列化介面
 * WritableComparable:即序列化也排序
 */
public class SortBean implements WritableComparable<SortBean>{
    // 組合key,第一部分是我們第一列,第二部分是我們第二列
    private String first;
    private int second;

    /**
     * 根據第一個字母排序,
     * 如果字母相同在根據第二個引數second進行排序
     * @param o
     * @return:如果返回大於0的數表示由上往下正序排列,小於0表示倒序排列,等於0表示相同
     */
    @Override
    public int compareTo(SortBean o) {
        int i = this.first.compareTo(o.first);
        if(i == 0){
            int j = Integer.valueOf(this.second).compareTo(Integer.valueOf(o.second));
            return j;
        }
        return i;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(first);
        out.writeInt(second);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.first = in.readUTF();
        this.second = in.readInt();
    }

    public String getFirst() {
        return first;
    }

    public void setFirst(String first) {
        this.first = first;
    }

    public int getSecond() {
        return second;
    }

    public void setSecond(int second) {
        this.second = second;
    }

    @Override
    public String toString() {
        return first+"\t"+second;
    }
}

三:map階段,同時實現計數器


import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 排序
 * 自定義Map
 */
public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable>{

    /**
     * 排序
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //自定義計數器,通過context上下文獲取
        //計數我們map輸入了多少條資料,這裡自定義計數器,key是MR_INPUT_COUNT,value是MAP_TOTAL_RESULT
        //輸出的結果可以在控制檯檢視,MAP_TOTAL_RESULT=xxx
        Counter mr_input_count = context.getCounter("MR_INPUT_COUNT", "MAP_TOTAL_RESULT");
        mr_input_count.increment(1L);//加1

        String[] split = value.toString().split("\t");
        SortBean sortBean = new SortBean();
        sortBean.setFirst(split[0]);
        sortBean.setSecond(Integer.valueOf(split[1]));
        context.write(sortBean,NullWritable.get());
    }
}

四:reduct階段,列舉方式實現計數器


import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
/**
 * 自定義的Reduce,同時也可以作為一個規約類,
 * 因為它們都是實現了Reducer類,規約類可以將
 */
public class SortReduce extends Reducer<SortBean,NullWritable,SortBean,NullWritable>{

    public static enum counter{
        REDUCE_INPUT_RECORD,
        REDUCE_OUTPUT_RECORD
    }

    @Override
    protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //自定義列舉類計數器,最後控制檯輸出REDUCE_INPUT_RECORD xxx,REDUCE_OUTPUT_RECORD  xxx
        Counter counter = context.getCounter(SortReduce.counter.REDUCE_INPUT_RECORD);
        counter.increment(1);//加1
        for (NullWritable value : values) {
            //定義輸出計數器
            Counter counter1 = context.getCounter(SortReduce.counter.REDUCE_OUTPUT_RECORD);
            counter1.increment(1);//輸出加1

            context.write(key,NullWritable.get());
        }
    }
}

最終結果: