hadoop二次排序 (Map/Reduce中分割槽和分組的問題)
1.二次排序概念:
首先按照第一欄位排序,然後再對第一欄位相同的行按照第二欄位排序,注意不能破壞第一次排序的結果 。
如: 輸入檔案:
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8
輸出(需要分割線):
------------------------------------------------
1 2
------------------------------------------------
3 4
------------------------------------------------
5 6
------------------------------------------------
7 8
7 82
------------------------------------------------
12 211
------------------------------------------------
20 21
20 53
20 522
------------------------------------------------
31 42
------------------------------------------------
40 511
------------------------------------------------
50 51
50 52
50 53
50 53
50 54
50 62
50 512
50 522
------------------------------------------------
60 51
60 52
60 53
60 56
60 56
60 57
60 57
60 61
------------------------------------------------
63 61
------------------------------------------------
70 54
70 55
70 56
70 57
70 58
70 58
------------------------------------------------
71 55
71 56
------------------------------------------------
73 57
------------------------------------------------
74 58
------------------------------------------------
203 21
------------------------------------------------
530 54
------------------------------------------------
730 54
------------------------------------------------
740 58
2.工作原理
使用如下map和reduce:(特別注意輸入輸出型別, 其中IntPair為自定義型別)
public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable>
在map階段,使用job.setInputFormatClass(TextInputFormat)做為輸入格式。注意輸出應該符合自定義Map中定義的輸出<IntPair, IntWritable>。最終是生成一個List<IntPair, IntWritable>。在map階段的最後,會先呼叫job.setPartitionerClass對這個List進行分割槽,每個分割槽對映到一個reducer。每個分割槽內又呼叫job.setSortComparatorClass設定的key比較函式類排序。可以看到,這本身就是一個二次排序。如果沒有通過job.setSortComparatorClass設定key比較函式類,則使用key的實現的compareTo方法。在隨後的例子中,第一個例子中,使用了IntPair實現的compareTo方法,而在下一個例子中,專門定義了key比較函式類。
在reduce階段,reducer接收到所有對映到這個reducer的map輸出後,也是會呼叫job.setSortComparatorClass設定的key比較函式類對所有資料對排序。然後開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass設定的分組函式類。只要這個比較器比較的兩個key相同,他們就屬於同一個組,它們的value放在一個value迭代器,而這個迭代器的key使用屬於同一個組的所有key的第一個key。最後就是進入Reducer的reduce方法,reduce方法的輸入是所有的(key和它的value迭代器)。同樣注意輸入與輸出的型別必須與自定義的Reducer中宣告的一致。
3,具體步驟
(1)自定義key
在mr中,所有的key是需要被比較和排序的,並且是二次,先根據partitione,再根據大小。而本例中也是要比較兩次。先按照第一欄位排序,然後再對第一欄位相同的按照第二欄位排序。根據這一點,我們可以構造一個複合類IntPair,他有兩個欄位,先利用分割槽對第一欄位排序,再利用分割槽內的比較對第二欄位排序。
所有自定義的key應該實現介面WritableComparable,因為是可序列的並且可比較的。並重載方法:
//序列化,將IntPair轉化成使用流傳送的二進位制 public void write(DataOutput out)
//key的比較 public int compareTo(IntPair o)
//另外新定義的類應該重寫的兩個方法
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce) public int hashCode()
public boolean equals(Object right)(2)由於key是自定義的,所以還需要自定義一下類:
(2.1)分割槽函式類。這是key的第一次比較。
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>
在job中使用setPartitionerClasss設定Partitioner。
(2.2)key比較函式類。這是key的第二次比較。這是一個比較器,需要繼承WritableComparator(也就是實現RawComprator介面)。
(這個就是前面說的第二種方法,但是在第三部分的程式碼中並沒有實現此函式,而是直接使用compareTo方法進行比較,所以也就不許下面一行的設定)
在job中使用setSortComparatorClass設定key比較函式類。
public static class GroupingComparator extends WritableComparator分組函式類也必須有一個建構函式,並且過載 public int compare(WritableComparable w1, WritableComparable w2)
分組函式類的另一種方法是實現介面RawComparator。
在job中使用setGroupingComparatorClass設定分組函式類。
另外注意的是,如果reduce的輸入與輸出不是同一種類型,則不要定義Combiner也使用reduce,因為Combiner的輸出是reduce的輸入。除非重新定義一個Combiner。
轉自:http://www.cnblogs.com/dandingyy/archive/2013/03/08/2950703.html