一起學Hadoop——二次排序演算法的實現
二次排序,從字面上可以理解為在對key排序的基礎上對key所對應的值value排序,也叫輔助排序。一般情況下,MapReduce框架只對key排序,而不對key所對應的值排序,因此value的排序經常是不固定的。但是我們經常會遇到同時對key和value排序的需求,例如Hadoop權威指南中的求一年的高高氣溫,key為年份,value為最高氣溫,年份按照降序排列,氣溫按照降序排列。還有水果電商網站經常會有按天統計水果銷售排行榜的需求等等,這些都是需要對key和value同時進行排序。如下圖所示:
如何設計一個MapReduce程式解決對key和value同時排序的需求呢?這就需要用到組合鍵、分割槽、分組的概念。在這裡又看到分割槽的影子,可知分割槽在MapReduce是多麼的重要,一定要好好掌握,是優化的重點。
按照上圖中資料流轉的方向,我們首先設計一個Fruit類,有三個欄位,分別是日期、水果名和銷量,將日期、水果名和銷量作為一個複合鍵;接著設計一個自定義Partition類,根據Fruit的日期欄位分割槽,讓相同日期的資料流向同一個partition分割槽中;最後定義一個分組類,實現同一個分割槽內的資料分組,然後按照銷量欄位進行二次排序。
具體實現思路:1、定義Fruit類,實現WritableComparable介面,並且重寫compareTo、equal和hashcode方法以及序列化和反序列化方法readFields和write方法。Java類要在網路上傳輸必須序列化和反序列化。在Map端的map函式中將Fruit物件當做key。compareTo方法用於比較兩個key的大小,在本文中就是比較兩個Fruit物件的排列順序。
2、自定義第一次排序類,繼承WritableComparable或者WritableComparator介面,重寫compareTo或者compare方法,。就是在Map端對Fruit物件的第一個欄位進行排序
3、自定義Partition類,實現Partitioner介面,並且重寫getPartition方法,將日期相同的Fruit物件分發到同一個partition中。
4、定義分組類,繼承WritableComparator介面,並且重寫compare方法。用於比較同一分組內兩個Fruit物件的排列順序,根據銷量欄位比較。日期相同的Fruit物件會劃分到同一個分組。通過setGroupingComparatorClass方法設定分組類。如果不設定分組類,則按照key預設的compare方法來對key進行排序。
程式碼如下:
1 import org.apache.hadoop.conf.Configured; 2 import org.apache.hadoop.io.WritableComparable; 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import org.apache.hadoop.io.*; 7 import org.apache.hadoop.mapreduce.Partitioner; 8 import org.apache.hadoop.mapreduce.Mapper; 9 import org.apache.hadoop.mapreduce.Reducer; 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.fs.FileSystem; 12 import org.apache.hadoop.fs.Path; 13 import org.apache.hadoop.mapreduce.Job; 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 17 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 18 import org.apache.hadoop.util.Tool; 19 import org.apache.hadoop.util.ToolRunner; 20 import org.slf4j.Logger; 21 import org.slf4j.LoggerFactory; 22 23 public class SecondrySort extends Configured implements Tool { 24 25 static class Fruit implements WritableComparable<Fruit>{ 26 private static final Logger logger = LoggerFactory.getLogger(Fruit.class); 27 private String date; 28 private String name; 29 private Integer sales; 30 public Fruit(){ 31 } 32 public Fruit(String date,String name,Integer sales){ 33 this.date = date; 34 this.name = name; 35 this.sales = sales; 36 } 37 38 public String getDate(){ 39 return this.date; 40 } 41 42 public String getName(){ 43 return this.name; 44 } 45 46 public Integer getSales(){ 47 return this.sales; 48 } 49 50 @Override 51 public void readFields(DataInput in) throws IOException{ 52 this.date = in.readUTF(); 53 this.name = in.readUTF(); 54 this.sales = in.readInt(); 55 } 56 57 @Override 58 public void write(DataOutput out) throws IOException{ 59 out.writeUTF(this.date); 60 out.writeUTF(this.name); 61 out.writeInt(sales); 62 } 63 64 @Override 65 public int compareTo(Fruit other) { 66 int result1 = this.date.compareTo(other.getDate()); 67 if(result1 == 0) { 68 int result2 = this.sales - other.getSales(); 69 if (result2 == 0) { 70 double result3 = this.name.compareTo(other.getName()); 71 if(result3 > 0) return -1; 72 else if(result3 < 0) return 1; 73 else return 0; 74 }else if(result2 >0){ 75 return -1; 76 }else if(result2 < 0){ 77 return 1; 78 } 79 }else if(result1 > 0){ 80 return -1; 81 }else{ 82 return 1; 83 } 84 return 0; 85 } 86 87 @Override 88 public int hashCode(){ 89 return this.date.hashCode() * 157 + this.sales + this.name.hashCode(); 90 } 91 92 @Override 93 public boolean equals(Object object){ 94 if (object == null) 95 return false; 96 if (this == object) 97 return true; 98 if (object instanceof Fruit){ 99 Fruit r = (Fruit) object; 100 // if(r.getDate().toString().equals(this.getDate().toString())){ 101 return r.getDate().equals(this.getDate()) && r.getName().equals(this.getName()) 102 && this.getSales() == r.getSales(); 103 }else{ 104 return false; 105 } 106 } 107 108 public String toString() { 109 return this.date + " " + this.name + " " + this.sales; 110 } 111 112 } 113 114 static class FruitPartition extends Partitioner<Fruit, NullWritable>{ 115 @Override 116 public int getPartition(Fruit key, NullWritable value,int numPartitions){ 117 return Math.abs(Integer.parseInt(key.getDate()) * 127) % numPartitions; 118 } 119 } 120 121 public static class GroupingComparator extends WritableComparator{ 122 protected GroupingComparator(){ 123 super(Fruit.class, true); 124 } 125 126 @Override 127 public int compare(WritableComparable w1, WritableComparable w2){ 128 Fruit f1 = (Fruit) w1; 129 Fruit f2 = (Fruit) w2; 130 131 if(!f1.getDate().equals(f2.getDate())){ 132 return f1.getDate().compareTo(f2.getDate()); 133 }else{ 134 return f1.getSales().compareTo(f2.getSales()); 135 } 136 } 137 } 138 139 public static class Map extends Mapper<LongWritable, Text, Fruit, NullWritable> { 140 141 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 142 String line = value.toString(); 143 String str[] = line.split(" "); 144 Fruit fruit = new Fruit(str[0],str[1],new Integer(str[2])); 145 //Fruit fruit = new Fruit(); 146 //fruit.set(str[0],str[1],new Integer(str[2])); 147 context.write(fruit, NullWritable.get()); 148 } 149 } 150 151 public static class Reduce extends Reducer<Fruit, NullWritable, Text, NullWritable> { 152 153 public void reduce(Fruit key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { 154 String str = key.getDate() + " " + key.getName() + " " + key.getSales(); 155 context.write(new Text(str), NullWritable.get()); 156 } 157 } 158 159 @Override 160 public int run(String[] args) throws Exception { 161 Configuration conf = new Configuration(); 162 // 判斷路徑是否存在,如果存在,則刪除 163 Path mypath = new Path(args[1]); 164 FileSystem hdfs = mypath.getFileSystem(conf); 165 if (hdfs.isDirectory(mypath)) { 166 hdfs.delete(mypath, true); 167 } 168 169 Job job = Job.getInstance(conf, "Secondry Sort app"); 170 // 設定主類 171 job.setJarByClass(SecondrySort.class); 172 173 // 輸入路徑 174 FileInputFormat.setInputPaths(job, new Path(args[0])); 175 // 輸出路徑 176 FileOutputFormat.setOutputPath(job, new Path(args[1])); 177 178 // Mapper 179 job.setMapperClass(Map.class); 180 // Reducer 181 job.setReducerClass(Reduce.class); 182 183 // 分割槽函式 184 job.setPartitionerClass(FruitPartition.class); 185 186 // 分組函式 187 job.setGroupingComparatorClass(GroupingComparator.class); 188 189 // map輸出key型別 190 job.setMapOutputKeyClass(Fruit.class); 191 // map輸出value型別 192 job.setMapOutputValueClass(NullWritable.class); 193 194 // reduce輸出key型別 195 job.setOutputKeyClass(Text.class); 196 // reduce輸出value型別 197 job.setOutputValueClass(NullWritable.class); 198 199 // 輸入格式 200 job.setInputFormatClass(TextInputFormat.class); 201 // 輸出格式 202 job.setOutputFormatClass(TextOutputFormat.class); 203 204 return job.waitForCompletion(true) ? 0 : 1; 205 } 206 207 public static void main(String[] args) throws Exception{ 208 int exitCode = ToolRunner.run(new SecondrySort(), args); 209 System.exit(exitCode); 210 } 211 }
測試資料:
20180906 Apple 20020180904 Apple 20020180905 Banana 10020180906 Orange 30020180906 Banana 40020180904 Orange 10020180905 Apple 40020180904 Banana 30020180905 Orange 500
執行結果:
20180906 Banana 40020180906 Orange 30020180906 Apple 20020180905 Orange 50020180905 Apple 40020180905 Banana 10020180904 Banana 30020180904 Apple 20020180904 Orange 100
總結:
1、在使用自定義比較器時,必須有一個無參的建構函式。2、readFields和write方法中處理欄位的順序必須一致,否則會報MapReduce Error: java.io.EOFException at java.io.DataInputStream.readFully(DataInputStream.java:197)的錯誤。
瞭解更多大資料的知識請關注我的微信公眾號:summer_bigdata
歡迎可以掃碼關注本人的公眾號: