hadoop 的排序:key排序和分組排序
引入和說明
熟悉mapreduce執行流程的都知道,mapreduce流程中,一共有兩類排序,第一種是對於key的排序,預設是是根據key的遞增排序。第二種是對於reduce的組排序,將兩條記錄的key帶入到 分組函式中,如果返回0,則說明兩個記錄是一組的,所以就把他們的value合到一個迭代器中,也就是reduce函式的第二個引數。
最下面那個程式碼(小標題是總體程式碼)包括了 map ,reduce,兩個key排序類,三個分組排序類,一個分割槽類,和一個主函式。也就是整個文章的所有程式碼。下面小片程式碼都是從裡面提取出來的,建議先看一下最後那個程式碼,對整個程式碼有一個完整性認識。
最後那個程式碼段是整個程式用的實驗資料,其實就是A-Z 和1-9順序的穿插在一起。
分割槽
分割槽的程式碼如下
static public class myPartition extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
char b = key.toString().charAt(0);
if (b >= 'A' && b <= 'Z' ) {
return 0;
}
if (b >= '0' && b <= '9')
return 1;
return 2;
}
}
這個分割槽函式很簡單,就是將記錄按照字母,數字和其他三種類型分別分到不同的是分割槽,即每次結果檔案中會包括四個檔案,分別是_SUCCESS、part-r-00000、part-r-00001、part-r-00002。part-r-00000儲存key是字母的記錄,part-r-00001儲存key是數字的記錄,part-r-00002儲存key是其他的記錄,因為資料都是我們自己寫的,不會出現其他這種情況,所以以下文章都值看part-r-00000和part-r-00001兩個檔案。具體的分割槽效果在下面展示。
第0種組合—預設排序
執行jar時,輸入第三個引數為0,即兩個排序演算法都不設定,用預設的即可,則結果檔案如下
# part-r-00000檔案
A 1
B 1
C 1
D 1
E 1
F 1
G 1
H 1
I 1
J 1
K 1
L 1
M 1
N 1
O 1
P 1
Q 1
R 1
S 1
T 1
U 1
V 1
W 1
X 1
Y 1
Z 1
# part-r-00001檔案
1 1
2 1
3 1
4 1
5 1
6 1
7 1
8 1
9 1
從上面看出,分割槽的效果,即把字母和數字分開到兩個結果檔案中。
而且,從結果檔案可以看出,預設的排序演算法就是按字典順序升序排列。而分組排序顯示不出來,但是,原始碼顯示,分組排序跟key排序是一樣的。
第一種組合
設定程式碼如下
if (Integer.valueOf(args[2]) == 1) {
System.out.println("==============1=================");
job.setSortComparatorClass(MySort.class);
}
MySort類的作用按照 key的第一個字元與3的餘數升序排列,具體程式碼可以看倒數第二個程式碼段。
實驗結果如下
# part-r-00000
N 9
L 8
D 9
# part-r-00001
3 3
4 3
8 3
是不是很奇怪是為什麼是這種結果,其實原因是如果只設置了key排序演算法而沒有設定分組排序,那麼分組排序也使用key排序的演算法。如果我們只想設定key排序演算法,那麼我們還要設定把分組排序設定成預設排序。但是我不知道hadoop有沒有預設排序的實現,所以我自己完成了一個預設排序,即按照字典排序升序排列(MyGroupSort2類)。這種實現是第五種組合,可以先去第五種組合那看,本文章本來就是穿插著寫的,所以也應該穿插著看。
第二種組合
if (Integer.valueOf(args[2]) == 2) {
System.out.println("======================2=============");
job.setSortComparatorClass(MySort.class);
job.setGroupingComparatorClass(MyGroupSort.class);
}
這個組合設定了分組函式,設定的分組函式作用是 key的第一個字元與3的餘數相同的key分為一組,其實,這個函式與第1種組合的key排序實現一樣,我只不過是為了驗證,當設定了key排序而沒有設定分組排序是,分組排序使用key排序的函式。也就是說,如果這種組合的結果與第1種的結果一模一樣,這麼這個結論就可以完全證實了。
下面是這種組合的結果
# part-r-00000
N 9
L 8
D 9
# part-r-00001
3 3
4 3
8 3
可以看出,第2種組合和第1種組合的結果一模一樣,當設定了key排序而沒有設定分組排序是,分組排序使用key排序的函式。這個結論驗證。
下面我們還要驗證另一個結論,就是如果只設置分組排序,而不設定key排序,那麼key排序會不會使用分組排序的函式。這個驗證可以看第三種組合
第三種組合
if (Integer.valueOf(args[2]) == 3) {
System.out.println("======================3===========");
job.setGroupingComparatorClass(MyGroupSort.class);
}
執行結果如下
# part-r-00000
A 1
B 1
C 1
D 1
E 1
F 1
G 1
H 1
I 1
J 1
K 1
L 1
M 1
N 1
O 1
P 1
Q 1
R 1
S 1
T 1
U 1
V 1
W 1
X 1
Y 1
Z 1
# part-r-00001
1 1
2 1
3 1
4 1
5 1
6 1
7 1
8 1
9 1
這個結果跟第0種組合(也就是預設設定)的結果一樣而不與第2種組合的結果相同,也就是說如果只設置分組排序,而不設定key排序,那麼key排序是不會使用分組排序的函式的,而且還可以推出一個更重要的結論。具體論述如下。
因為我們設定了分組排序,但是從結果來看,分組排序顯然沒有起到作用,因為那些ASCII碼和3的餘數相同的key並沒有合併到一起,這是為什麼呢,其實並不是分組排序沒有起到作用,而是分組排序的一個特性,這個特性就是,分組排序時,某一個key先與下一個key做分組排序,如果返回0,則說明這兩個key是相同的,就合併到一個迭代器中,然後用下一個key與下下個key做分組排序;如果返回的不是0,則說明這兩個key不一樣,也就不合併到同一個迭代器中,而且,這個key也不再和下下個key做分組排序了。例如,1的ASCII碼是49,與3的餘數為1,而2的ASCII碼是50,與3的餘數為2,這兩個key不一樣,而且,1也不再和3 4 5 …繼續比下去,也就是把 1 單獨列出來,作為一個迭代器,而且,key排序之後,key輸入的順序是 1 2 3 4 5 6 7 8 9 ,他們的ASCII碼與3的餘數與下一個key的餘數都是不一樣的,所以也就導致了part-r-00001檔案中顯示的那樣。hadoop之所以使用這種機制,是因為本來已經有了key排序,如果你想要把他們分到一組去,你就可以使用key排序把他們分到一起去,再使用分組函式把他們分到一個迭代器中。完全不用讓分組排序使用一 一對比的方式來分組,因為這樣很浪費資源。
也許你也會想,也許就是分組排序沒起到作用呢,所以我做了另一個實驗,來證明,如果只設置分組排序,而不設定key排序,那麼分組排序還是會起作用的,請看第4中組合。
第四種組合
if (Integer.valueOf(args[2]) == 4) {
System.out.println("======================4=============");
job.setGroupingComparatorClass(MyGroupSort1.class);
}
MyGroupSort1函式只返回0,所以不管傳入什麼,得到的結果都是,這兩個key是一樣的,應該分到一個組,加入到一個迭代器中,
執行結果如下:
# part-r-00000
Z 26
# part-r-00001
9 9
看到了把,如果只設置分組排序,不設定key排序,那麼分組排序還是會起到作用的。就像這個組合,因為分組排序只返回0,所以會把所有記錄分到一個迭代器中,而且 z 和 9都是每組的最後一個,記錄個數分別是 26 和 9。所以就產生了以上這種執行結果。
下面可以去看最後一個組合,第6種組合,
第五種組合
if (Integer.valueOf(args[2]) == 5) {
System.out.println("==============5=================");
job.setSortComparatorClass(MySort.class);
job.setGroupingComparatorClass(MyGroupSort2.class);
}
執行結果如下:
# part-r-00000
Z 1
E 1
K 1
H 1
B 1
T 1
Q 1
W 1
N 1
X 1
F 1
C 1
R 1
U 1
I 1
O 1
L 1
A 1
Y 1
V 1
S 1
P 1
M 1
J 1
G 1
D 1
# part-r-00001
6 1
9 1
3 1
1 1
7 1
4 1
5 1
2 1
8 1
這樣就跟我們預想的一樣了,先看part-r-00001檔案,因為 6 9 3的ASCII碼分別是 54 57 51,他們的ASCII碼與3的餘數都是0,而1 7 4的ASCII碼分別是 49 55 52, 與3的餘數是1,而5 2 8的ASCII碼分別是 53 50 56,與3的餘數是2,所以就成了上面顯示的那種排序方式。part-r-00000檔案的排序方式也是如此。如 Z的ASCII碼為90,90與3的餘數為0,所以排在前面,剩下的那些以此類推。
現在反過來看一下為什麼第1種組合所產生的結果。先看part-r-00001檔案,他顯示的是3 4 8,其中,他們的ASCII碼與3的餘數分別是 0 1 2。這就說明了一件事,分組排序是使用key排序的演算法。例如,6 9 3的ASCII碼與3的餘數都是0,所以就會合併到同一個迭代器中,這也就是為什麼part-r-00001檔案中3後面那個是3,也就是迭代器中有三個元素。再例如。1 7 4 的ASCII碼與3的餘數都是1,所以把他們合併到同一個迭代器中,part-r-00001檔案中 4 後面也是一個3。至於 合併之後,key是用誰的key,part-r-00001檔案中,6 9 3 裡的3,1 7 4裡的4 ,5 2 8裡的8,結論就是合併之後,用的是整個組的最後一個key。但是我在學習時。老師講的是,用整個組的最前面的一個key,我也不知道為什麼會出現矛盾,如果誰知道,請留言給我,多謝多謝。
part-r-00000檔案是這個樣子也就可以明白了,Z E K H B T Q W N的ASCII碼與3的餘數都是0,所以合併到同一個迭代器中,個數有9個,而N是整組的最後一個, part-r-00000檔案的第一行為 N 9,而第二行L 8 和第三行 D 9也是這麼來的,希望你看明白了,我已經盡了最大努力了。下面你可以看一下第二種組合。
第六種組合
if (Integer.valueOf(args[2]) == 6) {
System.out.println("======================6=============");
job.setSortComparatorClass(MySort1.class);
job.setGroupingComparatorClass(MyGroupSort.class);
}
這個組合其實和第三個組合一樣,只不過第三個組合的key排序使用的是程式預設的,而這個組合使用的是自己實現的key排序函式,其實,自己實現的key排序函式跟預設的排序函式一樣,都是按照 key的第一個字元的字典順序升序排列。這個組合的結果如下
# part-r-00000
A 1
B 1
C 1
D 1
E 1
F 1
G 1
H 1
I 1
J 1
K 1
L 1
M 1
N 1
O 1
P 1
Q 1
R 1
S 1
T 1
U 1
V 1
W 1
X 1
Y 1
Z 1
# part-r-00001
1 1
2 1
3 1
4 1
5 1
6 1
7 1
8 1
9 1
結果如第3種組合的結果一模一樣,我也就不多做解釋了,就是證明,預設的key排序函式就是按照 key的字典順序升序排列。
結論總結
下面是這篇文章所有的結論
1. 預設的key排序和分組排序都是按照 key的字典順序升序排列。(第0種和第6種組合證明)
2. 當設定了key排序而沒有設定分組排序是,分組排序使用key排序的函式。(1,2)
3. 如果只設置分組排序,而不設定key排序,那麼key排序是不會使用分組排序的函式的。(3)
4. 分組排序時,某一個key先與下一個key做分組排序,如果返回0,則說明這兩個key是相同的,就合併到一個迭代器中,然後用下一個key與下下個key做分組排序;如果返回的不是0,則說明這兩個key不一樣,也就不合併到同一個迭代器中,而且,這個key也不再和下下個key做分組排序了。(3)
總體程式碼
public class WordCount {
private static final Log LOG = LogFactory.getLog("MyLog");
public static class MapImpl extends
Mapper<LongWritable, Text, Text, IntWritable> {
enum ERRORROW {
error_field, error_row
}
private static Text k = new Text();
private static IntWritable v = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String valuestr = value.toString();
String[] splits = StringUtils.split(valuestr, ' ');
k.set(splits[0]);
context.write(k, v);
}
}
public static class ReducerImpl extends
Reducer<Text, IntWritable, Text, IntWritable> {
private static IntWritable v = new IntWritable();
@Override
protected void reduce(Text arg0, Iterable<IntWritable> arg1,
Context arg2) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable intWritable : arg1) {
int i = intWritable.get();
sum += i;
}
v.set(sum);
arg2.write(arg0, v);
}
}
static public class myPartition extends Partitioner<Text, IntWritable> {
@Override
public int getPartition(Text key, IntWritable value, int numPartitions) {
char b = key.toString().charAt(0);
if (b >= 'A' && b <= 'Z') {
return 0;
}
if (b >= '0' && b <= '9')
return 1;
return 2;
}
}
/**
* key 排序演算法,按照 key的第一個字元與3的餘數升序排列
*/
static public class MySort extends WritableComparator {
public MySort() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a == null || b == null)
return 0;
char c1 = a.toString().charAt(0);
char c2 = b.toString().charAt(0);
return c1 % 3 - c2 % 3;
}
}
/**
* key 排序演算法,按照 key的第一個字元的字典順序升序排列,即預設排序
*/
static public class MySort1 extends WritableComparator {
public MySort1() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
if (a == null || b == null)
return 0;
char c1 = a.toString().charAt(0);
char c2 = b.toString().charAt(0);
return c1 - c2;
}
}
/**
* 分組 排序演算法,key的第一個字元與3的餘數相同的key分為一組
*/
static public class MyGroupSort extends WritableComparator {
public MyGroupSort() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
char c1 = a.toString().charAt(0);
char c2 = b.toString().charAt(0);
return (c1 % 3) - (c2 % 3);
}
}
/**
* 分組 排序演算法,直接返回0,即 把所有 記錄整合成一個
*/
static public class MyGroupSort1 extends WritableComparator {
public MyGroupSort1() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
return 0;
}
}
/**
* 分組 排序演算法, 按照key的第一個字元是否相等進行分組,即預設分組
*/
static public class MyGroupSort2 extends WritableComparator {
public MyGroupSort2() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
char c1 = a.toString().charAt(0);
char c2 = b.toString().charAt(0);
return c1 - c2;
}
}
public static void main(String[] args) {
Configuration conf = new Configuration();
try {
Job job = new Job(conf, "Word Count");
job.setJarByClass(WordCount.class);
job.setMapperClass(MapImpl.class);
job.setReducerClass(ReducerImpl.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
Path inpath = new Path(args[0]);
Path outpath = new Path(args[1]);
FileSystem fs = FileSystem.get(conf);
if (!fs.exists(inpath)) {
System.out.println("輸入路徑 " + args[0] + " 不存在");
}
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
/*
* 排序演算法,有7個不同的搭配組合,通過這些組合可以理解
* mapreduce的排序。
*/
if (Integer.valueOf(args[2]) == 1) {
System.out.println("==============1=================");
job.setSortComparatorClass(MySort.class);
}
if (Integer.valueOf(args[2]) == 2) {
System.out.println("======================2=============");
job.setSortComparatorClass(MySort.class);
job.setGroupingComparatorClass(MyGroupSort.class);
}
if (Integer.valueOf(args[2]) == 3) {
System.out.println("======================3===========");
job.setGroupingComparatorClass(MyGroupSort.class);
}
if (Integer.valueOf(args[2]) == 4) {
System.out.println("======================4=============");
// job.setSortComparatorClass(MySort.class);
job.setGroupingComparatorClass(MyGroupSort1.class);
}
if (Integer.valueOf(args[2]) == 5) {
System.out.println("==============5=================");
job.setSortComparatorClass(MySort.class);
job.setGroupingComparatorClass(MyGroupSort2.class);
}
if (Integer.valueOf(args[2]) == 6) {
System.out.println("======================6=============");
job.setSortComparatorClass(MySort1.class);
job.setGroupingComparatorClass(MyGroupSort.class);
}
/**
* 分割槽
*/
job.setPartitionerClass(myPartition.class);
job.setNumReduceTasks(3);
FileInputFormat.addInputPath(job, inpath);
FileOutputFormat.setOutputPath(job, outpath);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
實驗資料
A
1
B
C
D
2
E
F
G
H
3
I
J
4
K
5
6
L
M
N
O
P
Q
R
7
S
T
8
U
V
W
X
Y
Z
9