MapReduce中的排序和計數器
阿新 • • 發佈:2018-11-28
一:條件準備
準備sort.txt文字
a 1
a 9
b 3
a 7
b 8
b 10
a 5
a 9
排序後輸出的文字:
a 1
a 5
a 7
a 9
a 9
b 3
b 8
b 10
二:排序介面WritableComparable
思路:將文字內容轉為一個sortBean,將此bean作為k2,使用NullWritable作為v2
sortBean實現WritableComparable介面,重寫compareTo方法,指定排序
import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * Writable:序列化介面 * WritableComparable:即序列化也排序 */ public class SortBean implements WritableComparable<SortBean>{ // 組合key,第一部分是我們第一列,第二部分是我們第二列 private String first; private int second; /** * 根據第一個字母排序, * 如果字母相同在根據第二個引數second進行排序 * @param o * @return:如果返回大於0的數表示由上往下正序排列,小於0表示倒序排列,等於0表示相同 */ @Override public int compareTo(SortBean o) { int i = this.first.compareTo(o.first); if(i == 0){ int j = Integer.valueOf(this.second).compareTo(Integer.valueOf(o.second)); return j; } return i; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(first); out.writeInt(second); } @Override public void readFields(DataInput in) throws IOException { this.first = in.readUTF(); this.second = in.readInt(); } public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } @Override public String toString() { return first+"\t"+second; } }
三:map階段,同時實現計數器
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * 排序 * 自定義Map */ public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable>{ /** * 排序 * @param key * @param value * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //自定義計數器,通過context上下文獲取 //計數我們map輸入了多少條資料,這裡自定義計數器,key是MR_INPUT_COUNT,value是MAP_TOTAL_RESULT //輸出的結果可以在控制檯檢視,MAP_TOTAL_RESULT=xxx Counter mr_input_count = context.getCounter("MR_INPUT_COUNT", "MAP_TOTAL_RESULT"); mr_input_count.increment(1L);//加1 String[] split = value.toString().split("\t"); SortBean sortBean = new SortBean(); sortBean.setFirst(split[0]); sortBean.setSecond(Integer.valueOf(split[1])); context.write(sortBean,NullWritable.get()); } }
四:reduct階段,列舉方式實現計數器
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * 自定義的Reduce,同時也可以作為一個規約類, * 因為它們都是實現了Reducer類,規約類可以將 */ public class SortReduce extends Reducer<SortBean,NullWritable,SortBean,NullWritable>{ public static enum counter{ REDUCE_INPUT_RECORD, REDUCE_OUTPUT_RECORD } @Override protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { //自定義列舉類計數器,最後控制檯輸出REDUCE_INPUT_RECORD xxx,REDUCE_OUTPUT_RECORD xxx Counter counter = context.getCounter(SortReduce.counter.REDUCE_INPUT_RECORD); counter.increment(1);//加1 for (NullWritable value : values) { //定義輸出計數器 Counter counter1 = context.getCounter(SortReduce.counter.REDUCE_OUTPUT_RECORD); counter1.increment(1);//輸出加1 context.write(key,NullWritable.get()); } } }
最終結果: