Mapreduce的排序初步
阿新 • • 發佈:2018-09-12
lec 接口 序列 () mapr 註意 pre 實現接口 con 1. 需求
在得出統計每一個用戶(手機號)所耗費的總上行流量、下行流量,總流量結果的基礎之上再加一個需求:將統計結果按照總流量倒序排序。
2. 分析
基本思路:
實現自定義的bean來封裝流量信息,並將bean作為map輸出的key來傳輸
MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),排序的依據是map輸出的key。所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:WritableComparable,然後重寫key的compareTo方法。
3. 實現
自定義的bean
在得出統計每一個用戶(手機號)所耗費的總上行流量、下行流量,總流量結果的基礎之上再加一個需求:將統計結果按照總流量倒序排序。
2. 分析
基本思路:
實現自定義的bean來封裝流量信息,並將bean作為map輸出的key來傳輸
MR程序在處理數據的過程中會對數據排序(map輸出的kv對傳輸到reduce之前,會排序),排序的依據是map輸出的key。所以,我們如果要實現自己需要的排序規則,則可以考慮將排序因素放到key中,讓key實現接口:WritableComparable,然後重寫key的compareTo方法。
3. 實現
自定義的bean
public class FlowBean implements WritableComparable<FlowBean>{ private long upFlow; private long downFlow; private long sumFlow; //這裏反序列的時候會用到 public FlowBean() { } public FlowBean(long upFlow, long downFlow, long sumFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow+downFlow; } public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow+downFlow; } @Override public String toString() { return upFlow+"\t"+downFlow+"\t"+sumFlow; } /這裏是序列化方法 @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /這裏是反序列化方法 @Override public void readFields(DataInput in) throws IOException { //註意反序列化的順序跟序列化的順序一致 this.upFlow = in.readLong(); this.downFlow = in.readLong(); this.sumFlow = in.readLong(); } //這裏進行bean的自定義比較大小 @Override public int compareTo(FlowBean o) { //實現按照 sumflow 的大小倒序排序 return this.sumFlow>o.getSumFlow()?-1:1; } }
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ Text k = new Text(); FlowBean v = new FlowBean(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[fields.length-3]); long downFlow = Long.parseLong(fields[fields.length-2]); k.set(phoneNum); v.set(upFlow, downFlow); context.write(k, v); } } public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> { FlowBean v = new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long upFlowCount = 0; long downFlowCount = 0; for (FlowBean bean : values) { upFlowCount += bean.getUpFlow(); downFlowCount += bean.getDownFlow(); } v.set(upFlowCount, downFlowCount); context.write(key, v); } }
Mapreduce的排序初步