1. 程式人生 > >一起學Hadoop——二次排序演算法的實現

一起學Hadoop——二次排序演算法的實現

二次排序,從字面上可以理解為在對key排序的基礎上對key所對應的值value排序,也叫輔助排序。一般情況下,MapReduce框架只對key排序,而不對key所對應的值排序,因此value的排序經常是不固定的。但是我們經常會遇到同時對key和value排序的需求,例如Hadoop權威指南中的求一年的高高氣溫,key為年份,value為最高氣溫,年份按照降序排列,氣溫按照降序排列。還有水果電商網站經常會有按天統計水果銷售排行榜的需求等等,這些都是需要對key和value同時進行排序。如下圖所示:

如何設計一個MapReduce程式解決對key和value同時排序的需求呢?這就需要用到組合鍵、分割槽、分組的概念。在這裡又看到分割槽的影子,可知分割槽在MapReduce是多麼的重要,一定要好好掌握,是優化的重點。

按照上圖中資料流轉的方向,我們首先設計一個Fruit類,有三個欄位,分別是日期、水果名和銷量,將日期、水果名和銷量作為一個複合鍵;接著設計一個自定義Partition類,根據Fruit的日期欄位分割槽,讓相同日期的資料流向同一個partition分割槽中;最後定義一個分組類,實現同一個分割槽內的資料分組,然後按照銷量欄位進行二次排序。

具體實現思路:1、定義Fruit類,實現WritableComparable介面,並且重寫compareTo、equal和hashcode方法以及序列化和反序列化方法readFields和write方法。Java類要在網路上傳輸必須序列化和反序列化。在Map端的map函式中將Fruit物件當做key。compareTo方法用於比較兩個key的大小,在本文中就是比較兩個Fruit物件的排列順序。

2、自定義第一次排序類,繼承WritableComparable或者WritableComparator介面,重寫compareTo或者compare方法,。就是在Map端對Fruit物件的第一個欄位進行排序

3、自定義Partition類,實現Partitioner介面,並且重寫getPartition方法,將日期相同的Fruit物件分發到同一個partition中。

4、定義分組類,繼承WritableComparator介面,並且重寫compare方法。用於比較同一分組內兩個Fruit物件的排列順序,根據銷量欄位比較。日期相同的Fruit物件會劃分到同一個分組。通過setGroupingComparatorClass方法設定分組類。如果不設定分組類,則按照key預設的compare方法來對key進行排序。

程式碼如下:

  1 import org.apache.hadoop.conf.Configured;
  2 import org.apache.hadoop.io.WritableComparable;
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import org.apache.hadoop.io.*;
  7 import org.apache.hadoop.mapreduce.Partitioner;
  8 import org.apache.hadoop.mapreduce.Mapper;
  9 import org.apache.hadoop.mapreduce.Reducer;
 10 import org.apache.hadoop.conf.Configuration;
 11 import org.apache.hadoop.fs.FileSystem;
 12 import org.apache.hadoop.fs.Path;
 13 import org.apache.hadoop.mapreduce.Job;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 18 import org.apache.hadoop.util.Tool;
 19 import org.apache.hadoop.util.ToolRunner;
 20 import org.slf4j.Logger;
 21 import org.slf4j.LoggerFactory;
 22 
 23 public class SecondrySort extends Configured implements Tool {
 24 
 25     static class Fruit implements WritableComparable<Fruit>{
 26         private static final Logger logger = LoggerFactory.getLogger(Fruit.class);
 27         private String date;
 28         private String name;
 29         private Integer sales;
 30         public Fruit(){
 31         }
 32         public Fruit(String date,String name,Integer sales){
 33             this.date = date;
 34             this.name = name;
 35             this.sales = sales;
 36         }
 37 
 38         public String getDate(){
 39             return this.date;
 40         }
 41 
 42         public String getName(){
 43             return this.name;
 44         }
 45 
 46         public Integer getSales(){
 47             return this.sales;
 48         }
 49 
 50         @Override
 51         public void readFields(DataInput in) throws IOException{
 52             this.date = in.readUTF();
 53             this.name = in.readUTF();
 54             this.sales = in.readInt();
 55         }
 56 
 57         @Override
 58         public void write(DataOutput out) throws IOException{
 59             out.writeUTF(this.date);
 60             out.writeUTF(this.name);
 61             out.writeInt(sales);
 62         }
 63 
 64         @Override
 65         public int compareTo(Fruit other) {
 66             int result1 = this.date.compareTo(other.getDate());
 67             if(result1 == 0) {
 68                 int result2 = this.sales - other.getSales();
 69                 if (result2 == 0) {
 70                     double result3 = this.name.compareTo(other.getName());
 71                     if(result3 > 0) return -1;
 72                     else if(result3 < 0) return 1;
 73                     else return 0;
 74                 }else if(result2 >0){
 75                     return -1;
 76                 }else if(result2 < 0){
 77                     return 1;
 78                 }
 79             }else if(result1 > 0){
 80                 return -1;
 81             }else{
 82                 return 1;
 83             }
 84             return 0;
 85         }
 86 
 87         @Override
 88         public int hashCode(){
 89             return this.date.hashCode() * 157 + this.sales + this.name.hashCode();
 90         }
 91 
 92         @Override
 93         public boolean equals(Object object){
 94             if (object == null)
 95                 return false;
 96             if (this == object)
 97                 return true;
 98             if (object instanceof Fruit){
 99                 Fruit r = (Fruit) object;
100 //                if(r.getDate().toString().equals(this.getDate().toString())){
101                 return r.getDate().equals(this.getDate()) && r.getName().equals(this.getName())
102                         && this.getSales() == r.getSales();
103             }else{
104                 return false;
105             }
106         }
107 
108         public String toString() {
109             return this.date + " " + this.name + " " + this.sales;
110         }
111 
112     }
113 
114     static class FruitPartition extends Partitioner<Fruit, NullWritable>{
115         @Override
116         public int getPartition(Fruit key, NullWritable value,int numPartitions){
117             return Math.abs(Integer.parseInt(key.getDate()) * 127) % numPartitions;
118         }
119     }
120 
121     public static class GroupingComparator extends WritableComparator{
122         protected GroupingComparator(){
123             super(Fruit.class, true);
124         }
125 
126         @Override
127         public int compare(WritableComparable w1, WritableComparable w2){
128             Fruit f1 = (Fruit) w1;
129             Fruit f2 = (Fruit) w2;
130 
131             if(!f1.getDate().equals(f2.getDate())){
132                 return f1.getDate().compareTo(f2.getDate());
133             }else{
134                 return f1.getSales().compareTo(f2.getSales());
135             }
136         }
137     }
138 
139     public static class Map extends Mapper<LongWritable, Text, Fruit, NullWritable> {
140 
141         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
142             String line = value.toString();
143             String str[] = line.split(" ");
144             Fruit fruit = new Fruit(str[0],str[1],new Integer(str[2]));
145             //Fruit fruit = new Fruit();
146             //fruit.set(str[0],str[1],new Integer(str[2]));
147             context.write(fruit, NullWritable.get());
148         }
149     }
150 
151     public static class Reduce extends Reducer<Fruit, NullWritable, Text, NullWritable> {
152 
153         public void reduce(Fruit key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
154             String str = key.getDate() + " " + key.getName() + " " + key.getSales();
155             context.write(new Text(str), NullWritable.get());
156         }
157     }
158 
159     @Override
160     public int run(String[] args) throws Exception {
161         Configuration conf = new Configuration();
162         // 判斷路徑是否存在,如果存在,則刪除
163         Path mypath = new Path(args[1]);
164         FileSystem hdfs = mypath.getFileSystem(conf);
165         if (hdfs.isDirectory(mypath)) {
166             hdfs.delete(mypath, true);
167         }
168 
169         Job job = Job.getInstance(conf, "Secondry Sort app");
170         // 設定主類
171         job.setJarByClass(SecondrySort.class);
172 
173         // 輸入路徑
174         FileInputFormat.setInputPaths(job, new Path(args[0]));
175         // 輸出路徑
176         FileOutputFormat.setOutputPath(job, new Path(args[1]));
177 
178         // Mapper
179         job.setMapperClass(Map.class);
180         // Reducer
181         job.setReducerClass(Reduce.class);
182 
183         // 分割槽函式
184         job.setPartitionerClass(FruitPartition.class);
185 
186         // 分組函式
187         job.setGroupingComparatorClass(GroupingComparator.class);
188 
189         // map輸出key型別
190         job.setMapOutputKeyClass(Fruit.class);
191         // map輸出value型別
192         job.setMapOutputValueClass(NullWritable.class);
193 
194         // reduce輸出key型別
195         job.setOutputKeyClass(Text.class);
196         // reduce輸出value型別
197         job.setOutputValueClass(NullWritable.class);
198 
199         // 輸入格式
200         job.setInputFormatClass(TextInputFormat.class);
201         // 輸出格式
202         job.setOutputFormatClass(TextOutputFormat.class);
203 
204         return job.waitForCompletion(true) ? 0 : 1;
205     }
206 
207     public static void main(String[] args) throws Exception{
208         int exitCode = ToolRunner.run(new SecondrySort(), args);
209         System.exit(exitCode);
210     }
211 }

測試資料:

20180906 Apple 20020180904 Apple 20020180905 Banana 10020180906 Orange 30020180906 Banana 40020180904 Orange 10020180905 Apple 40020180904 Banana 30020180905 Orange 500

執行結果:

20180906 Banana 40020180906 Orange 30020180906 Apple 20020180905 Orange 50020180905 Apple 40020180905 Banana 10020180904 Banana 30020180904 Apple 20020180904 Orange 100

總結:

1、在使用自定義比較器時,必須有一個無參的建構函式。2、readFields和write方法中處理欄位的順序必須一致,否則會報MapReduce Error: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197)的錯誤。

瞭解更多大資料的知識請關注我的微信公眾號:summer_bigdata

歡迎可以掃碼關注本人的公眾號: