1. 程式人生 > >hadoop 的排序:key排序和分組排序

hadoop 的排序:key排序和分組排序

引入和說明

熟悉mapreduce執行流程的都知道,mapreduce流程中,一共有兩類排序,第一種是對於key的排序,預設是是根據key的遞增排序。第二種是對於reduce的組排序,將兩條記錄的key帶入到 分組函式中,如果返回0,則說明兩個記錄是一組的,所以就把他們的value合到一個迭代器中,也就是reduce函式的第二個引數。

最下面那個程式碼(小標題是總體程式碼)包括了 map ,reduce,兩個key排序類,三個分組排序類,一個分割槽類,和一個主函式。也就是整個文章的所有程式碼。下面小片程式碼都是從裡面提取出來的,建議先看一下最後那個程式碼,對整個程式碼有一個完整性認識。
最後那個程式碼段是整個程式用的實驗資料,其實就是A-Z 和1-9順序的穿插在一起。

分割槽

分割槽的程式碼如下

    static public class myPartition extends Partitioner<Text, IntWritable> {

        @Override
        public int getPartition(Text key, IntWritable value, int numPartitions) {
            char b = key.toString().charAt(0);
            if (b >= 'A' && b <= 'Z'
) { return 0; } if (b >= '0' && b <= '9') return 1; return 2; } }

這個分割槽函式很簡單,就是將記錄按照字母,數字和其他三種類型分別分到不同的是分割槽,即每次結果檔案中會包括四個檔案,分別是_SUCCESS、part-r-00000、part-r-00001、part-r-00002。part-r-00000儲存key是字母的記錄,part-r-00001儲存key是數字的記錄,part-r-00002儲存key是其他的記錄,因為資料都是我們自己寫的,不會出現其他這種情況,所以以下文章都值看part-r-00000和part-r-00001兩個檔案。具體的分割槽效果在下面展示。

第0種組合—預設排序

執行jar時,輸入第三個引數為0,即兩個排序演算法都不設定,用預設的即可,則結果檔案如下

#  part-r-00000檔案
A   1
B   1
C   1
D   1
E   1
F   1
G   1
H   1
I   1
J   1
K   1
L   1
M   1
N   1
O   1
P   1
Q   1
R   1
S   1
T   1
U   1
V   1
W   1
X   1
Y   1
Z   1

#  part-r-00001檔案
1   1
2   1
3   1
4   1
5   1
6   1
7   1
8   1
9   1

從上面看出,分割槽的效果,即把字母和數字分開到兩個結果檔案中。
而且,從結果檔案可以看出,預設的排序演算法就是按字典順序升序排列。而分組排序顯示不出來,但是,原始碼顯示,分組排序跟key排序是一樣的。

第一種組合

設定程式碼如下

    if (Integer.valueOf(args[2]) == 1) {
                System.out.println("==============1=================");
                job.setSortComparatorClass(MySort.class);
            }

MySort類的作用按照 key的第一個字元與3的餘數升序排列,具體程式碼可以看倒數第二個程式碼段。
實驗結果如下

#  part-r-00000
N   9
L   8
D   9

#  part-r-00001
3   3
4   3
8   3

是不是很奇怪是為什麼是這種結果,其實原因是如果只設置了key排序演算法而沒有設定分組排序,那麼分組排序也使用key排序的演算法。如果我們只想設定key排序演算法,那麼我們還要設定把分組排序設定成預設排序。但是我不知道hadoop有沒有預設排序的實現,所以我自己完成了一個預設排序,即按照字典排序升序排列(MyGroupSort2類)。這種實現是第五種組合,可以先去第五種組合那看,本文章本來就是穿插著寫的,所以也應該穿插著看。

第二種組合

if (Integer.valueOf(args[2]) == 2) {
                System.out.println("======================2=============");
                job.setSortComparatorClass(MySort.class);
                job.setGroupingComparatorClass(MyGroupSort.class);
            }

這個組合設定了分組函式,設定的分組函式作用是 key的第一個字元與3的餘數相同的key分為一組,其實,這個函式與第1種組合的key排序實現一樣,我只不過是為了驗證,當設定了key排序而沒有設定分組排序是,分組排序使用key排序的函式。也就是說,如果這種組合的結果與第1種的結果一模一樣,這麼這個結論就可以完全證實了。
下面是這種組合的結果

#  part-r-00000
N   9
L   8
D   9

#  part-r-00001
3   3
4   3
8   3

可以看出,第2種組合和第1種組合的結果一模一樣,當設定了key排序而沒有設定分組排序是,分組排序使用key排序的函式。這個結論驗證。

下面我們還要驗證另一個結論,就是如果只設置分組排序,而不設定key排序,那麼key排序會不會使用分組排序的函式。這個驗證可以看第三種組合

第三種組合

            if (Integer.valueOf(args[2]) == 3) {
                System.out.println("======================3===========");
                job.setGroupingComparatorClass(MyGroupSort.class);
            }

執行結果如下

#  part-r-00000
A   1
B   1
C   1
D   1
E   1
F   1
G   1
H   1
I   1
J   1
K   1
L   1
M   1
N   1
O   1
P   1
Q   1
R   1
S   1
T   1
U   1
V   1
W   1
X   1
Y   1
Z   1

#  part-r-00001
1   1
2   1
3   1
4   1
5   1
6   1
7   1
8   1
9   1

這個結果跟第0種組合(也就是預設設定)的結果一樣而不與第2種組合的結果相同,也就是說如果只設置分組排序,而不設定key排序,那麼key排序是不會使用分組排序的函式的,而且還可以推出一個更重要的結論。具體論述如下。
因為我們設定了分組排序,但是從結果來看,分組排序顯然沒有起到作用,因為那些ASCII碼和3的餘數相同的key並沒有合併到一起,這是為什麼呢,其實並不是分組排序沒有起到作用,而是分組排序的一個特性,這個特性就是,分組排序時,某一個key先與下一個key做分組排序,如果返回0,則說明這兩個key是相同的,就合併到一個迭代器中,然後用下一個key與下下個key做分組排序;如果返回的不是0,則說明這兩個key不一樣,也就不合併到同一個迭代器中,而且,這個key也不再和下下個key做分組排序了。例如,1的ASCII碼是49,與3的餘數為1,而2的ASCII碼是50,與3的餘數為2,這兩個key不一樣,而且,1也不再和3 4 5 …繼續比下去,也就是把 1 單獨列出來,作為一個迭代器,而且,key排序之後,key輸入的順序是 1 2 3 4 5 6 7 8 9 ,他們的ASCII碼與3的餘數與下一個key的餘數都是不一樣的,所以也就導致了part-r-00001檔案中顯示的那樣。hadoop之所以使用這種機制,是因為本來已經有了key排序,如果你想要把他們分到一組去,你就可以使用key排序把他們分到一起去,再使用分組函式把他們分到一個迭代器中。完全不用讓分組排序使用一 一對比的方式來分組,因為這樣很浪費資源。
也許你也會想,也許就是分組排序沒起到作用呢,所以我做了另一個實驗,來證明,如果只設置分組排序,而不設定key排序,那麼分組排序還是會起作用的,請看第4中組合。

第四種組合

    if (Integer.valueOf(args[2]) == 4) {
                System.out.println("======================4=============");
                job.setGroupingComparatorClass(MyGroupSort1.class);
            }

MyGroupSort1函式只返回0,所以不管傳入什麼,得到的結果都是,這兩個key是一樣的,應該分到一個組,加入到一個迭代器中,
執行結果如下:

#  part-r-00000
Z   26

#  part-r-00001
9   9

看到了把,如果只設置分組排序,不設定key排序,那麼分組排序還是會起到作用的。就像這個組合,因為分組排序只返回0,所以會把所有記錄分到一個迭代器中,而且 z 和 9都是每組的最後一個,記錄個數分別是 26 和 9。所以就產生了以上這種執行結果。
下面可以去看最後一個組合,第6種組合,

第五種組合

            if (Integer.valueOf(args[2]) == 5) {
                System.out.println("==============5=================");
                job.setSortComparatorClass(MySort.class);
                job.setGroupingComparatorClass(MyGroupSort2.class);
            }

執行結果如下:

#  part-r-00000
Z   1
E   1
K   1
H   1
B   1
T   1
Q   1
W   1
N   1
X   1
F   1
C   1
R   1
U   1
I   1
O   1
L   1
A   1
Y   1
V   1
S   1
P   1
M   1
J   1
G   1
D   1

#  part-r-00001
6   1
9   1
3   1
1   1
7   1
4   1
5   1
2   1
8   1

這樣就跟我們預想的一樣了,先看part-r-00001檔案,因為 6 9 3的ASCII碼分別是 54 57 51,他們的ASCII碼與3的餘數都是0,而1 7 4的ASCII碼分別是 49 55 52, 與3的餘數是1,而5 2 8的ASCII碼分別是 53 50 56,與3的餘數是2,所以就成了上面顯示的那種排序方式。part-r-00000檔案的排序方式也是如此。如 Z的ASCII碼為90,90與3的餘數為0,所以排在前面,剩下的那些以此類推。
現在反過來看一下為什麼第1種組合所產生的結果。先看part-r-00001檔案,他顯示的是3 4 8,其中,他們的ASCII碼與3的餘數分別是 0 1 2。這就說明了一件事,分組排序是使用key排序的演算法。例如,6 9 3的ASCII碼與3的餘數都是0,所以就會合併到同一個迭代器中,這也就是為什麼part-r-00001檔案中3後面那個是3,也就是迭代器中有三個元素。再例如。1 7 4 的ASCII碼與3的餘數都是1,所以把他們合併到同一個迭代器中,part-r-00001檔案中 4 後面也是一個3。至於 合併之後,key是用誰的key,part-r-00001檔案中,6 9 3 裡的3,1 7 4裡的4 ,5 2 8裡的8,結論就是合併之後,用的是整個組的最後一個key。但是我在學習時。老師講的是,用整個組的最前面的一個key,我也不知道為什麼會出現矛盾,如果誰知道,請留言給我,多謝多謝。
part-r-00000檔案是這個樣子也就可以明白了,Z E K H B T Q W N的ASCII碼與3的餘數都是0,所以合併到同一個迭代器中,個數有9個,而N是整組的最後一個, part-r-00000檔案的第一行為 N 9,而第二行L 8 和第三行 D 9也是這麼來的,希望你看明白了,我已經盡了最大努力了。下面你可以看一下第二種組合。

第六種組合

            if (Integer.valueOf(args[2]) == 6) {
                System.out.println("======================6=============");
                job.setSortComparatorClass(MySort1.class);
                job.setGroupingComparatorClass(MyGroupSort.class);
            }

這個組合其實和第三個組合一樣,只不過第三個組合的key排序使用的是程式預設的,而這個組合使用的是自己實現的key排序函式,其實,自己實現的key排序函式跟預設的排序函式一樣,都是按照 key的第一個字元的字典順序升序排列。這個組合的結果如下

#  part-r-00000
A   1
B   1
C   1
D   1
E   1
F   1
G   1
H   1
I   1
J   1
K   1
L   1
M   1
N   1
O   1
P   1
Q   1
R   1
S   1
T   1
U   1
V   1
W   1
X   1
Y   1
Z   1

#  part-r-00001
1   1
2   1
3   1
4   1
5   1
6   1
7   1
8   1
9   1

結果如第3種組合的結果一模一樣,我也就不多做解釋了,就是證明,預設的key排序函式就是按照 key的字典順序升序排列。

結論總結

下面是這篇文章所有的結論
1. 預設的key排序和分組排序都是按照 key的字典順序升序排列。(第0種和第6種組合證明)
2. 當設定了key排序而沒有設定分組排序是,分組排序使用key排序的函式。(1,2)
3. 如果只設置分組排序,而不設定key排序,那麼key排序是不會使用分組排序的函式的。(3)
4. 分組排序時,某一個key先與下一個key做分組排序,如果返回0,則說明這兩個key是相同的,就合併到一個迭代器中,然後用下一個key與下下個key做分組排序;如果返回的不是0,則說明這兩個key不一樣,也就不合併到同一個迭代器中,而且,這個key也不再和下下個key做分組排序了。(3)

總體程式碼

public class WordCount {
    private static final Log LOG = LogFactory.getLog("MyLog");

    public static class MapImpl extends
            Mapper<LongWritable, Text, Text, IntWritable> {

        enum ERRORROW {
            error_field, error_row
        }

        private static Text k = new Text();
        private static IntWritable v = new IntWritable(1);

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String valuestr = value.toString();
            String[] splits = StringUtils.split(valuestr, ' ');
            k.set(splits[0]);
            context.write(k, v);
        }
    }

    public static class ReducerImpl extends
            Reducer<Text, IntWritable, Text, IntWritable> {

        private static IntWritable v = new IntWritable();

        @Override
        protected void reduce(Text arg0, Iterable<IntWritable> arg1,
                Context arg2) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable intWritable : arg1) {
                int i = intWritable.get();
                sum += i;
            }
            v.set(sum);
            arg2.write(arg0, v);
        }
    }

    static public class myPartition extends Partitioner<Text, IntWritable> {

        @Override
        public int getPartition(Text key, IntWritable value, int numPartitions) {
            char b = key.toString().charAt(0);
            if (b >= 'A' && b <= 'Z') {
                return 0;
            }
            if (b >= '0' && b <= '9')
                return 1;
            return 2;
        }

    }

    /**
     * key 排序演算法,按照 key的第一個字元與3的餘數升序排列
     */
    static public class MySort extends WritableComparator {

        public MySort() {
            super(Text.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            if (a == null || b == null)
                return 0;
            char c1 = a.toString().charAt(0);
            char c2 = b.toString().charAt(0);

            return c1 % 3 - c2 % 3;
        }

    }

    /**
     * key 排序演算法,按照 key的第一個字元的字典順序升序排列,即預設排序
     */
    static public class MySort1 extends WritableComparator {

        public MySort1() {
            super(Text.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            if (a == null || b == null)
                return 0;
            char c1 = a.toString().charAt(0);
            char c2 = b.toString().charAt(0);

            return c1 - c2;
        }

    }

    /**
     * 分組 排序演算法,key的第一個字元與3的餘數相同的key分為一組
     */
    static public class MyGroupSort extends WritableComparator {
        public MyGroupSort() {
            super(Text.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            char c1 = a.toString().charAt(0);
            char c2 = b.toString().charAt(0);
            return (c1 % 3) - (c2 % 3);
        }

    }


    /**
     * 分組 排序演算法,直接返回0,即 把所有 記錄整合成一個
     */
    static public class MyGroupSort1 extends WritableComparator {

        public MyGroupSort1() {
            super(Text.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            return 0;
        }

    }

    /**
     * 分組 排序演算法, 按照key的第一個字元是否相等進行分組,即預設分組
     */
    static public class MyGroupSort2 extends WritableComparator {

        public MyGroupSort2() {
            super(Text.class, true);
        }

        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            char c1 = a.toString().charAt(0);
            char c2 = b.toString().charAt(0);

            return c1 - c2;
        }

    }

    public static void main(String[] args) {

        Configuration conf = new Configuration();

        try {
            Job job = new Job(conf, "Word Count");

            job.setJarByClass(WordCount.class);

            job.setMapperClass(MapImpl.class);
            job.setReducerClass(ReducerImpl.class);

            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);

            Path inpath = new Path(args[0]);
            Path outpath = new Path(args[1]);

            FileSystem fs = FileSystem.get(conf);

            if (!fs.exists(inpath)) {
                System.out.println("輸入路徑 " + args[0] + " 不存在");
            }
            if (fs.exists(outpath)) {
                fs.delete(outpath, true);
            }

            /*
             * 排序演算法,有7個不同的搭配組合,通過這些組合可以理解                         
             * mapreduce的排序。
             */ 
            if (Integer.valueOf(args[2]) == 1) {
                System.out.println("==============1=================");
                job.setSortComparatorClass(MySort.class);
            }

            if (Integer.valueOf(args[2]) == 2) {
                System.out.println("======================2=============");
                job.setSortComparatorClass(MySort.class);
                job.setGroupingComparatorClass(MyGroupSort.class);
            }
            if (Integer.valueOf(args[2]) == 3) {
                System.out.println("======================3===========");
                job.setGroupingComparatorClass(MyGroupSort.class);
            }
            if (Integer.valueOf(args[2]) == 4) {
                System.out.println("======================4=============");
                // job.setSortComparatorClass(MySort.class);
                job.setGroupingComparatorClass(MyGroupSort1.class);
            }
            if (Integer.valueOf(args[2]) == 5) {
                System.out.println("==============5=================");
                job.setSortComparatorClass(MySort.class);
                job.setGroupingComparatorClass(MyGroupSort2.class);
            }
            if (Integer.valueOf(args[2]) == 6) {
                System.out.println("======================6=============");
                job.setSortComparatorClass(MySort1.class);
                job.setGroupingComparatorClass(MyGroupSort.class);
            }

            /**
             * 分割槽
             */
            job.setPartitionerClass(myPartition.class);
            job.setNumReduceTasks(3);

            FileInputFormat.addInputPath(job, inpath);
            FileOutputFormat.setOutputPath(job, outpath);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            System.exit(job.waitForCompletion(true) ? 0 : 1);

        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

實驗資料

A
1
B
C
D
2
E
F
G
H
3
I
J
4
K
5
6
L
M
N
O
P
Q
R
7
S
T
8
U
V
W
X
Y
Z
9