1. 程式人生 > >MapReduce程序之數據排序

MapReduce程序之數據排序

大數據 Hadoop MapReduce Java

[toc]


MapReduce程序之數據排序

需求

下面有三個文件:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file1.csv
2
32
654
32
15
756
65223
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file2.csv
5956
22
650
92
yeyonghao@yeyonghaodeMacBook-Pro:~/data/input/sort$ cat file3.csv
26
54
6

使用MapReduce對其進行排序並輸出。

分析思路

Map階段分析:
/**
 * 數據在Map之後會做sort(從內存緩沖區到磁盤的時候會做sort),所以Map操作只需要把數據直接寫出即可,最後在本地做數據
 * 合並的時候也是會有排序的,詳細可以參考MapReduce的過程,但是需要註意的是,因為我們需要進行的是數字的排序,
 * 所以在Map輸出時,key的類型應該是Int類型才能按照數字的方式進行排序,如果是Text文本的話,那麽是按照字典順序
 * 來進行排序的(也就是先比較字符串中的第一個字符,如果相同再比較第二個字符,以此類推),而不是按照數字進行排序
 */

Reduce階段分析:
/**
 * 需要註意的是,排序與其它MapReduce程序有所不同,最後在驅動程序設置ReduceTask時,必須要設置為1
 * 這樣才能把數據都匯總到一起,另外一點,數據在shuffle到達reducer的時候,從內存緩沖區寫到磁盤時
 * 也會進行排序操作,所以即便是從不同節點上的Map上shuffle來的數據,到輸入到reducer時,數據也是有序的,
 * 所以Reducer需要做的是把數據直接寫到context中就可以了
 */ 

MapReduce程序

關於如何進行數據的排序,思路已經在代碼註釋中有說明,不過需要註意的是,這裏使用了前面開發的Job工具類來開發驅動程序,程序代碼如下:

package com.uplooking.bigdata.mr.sort;

import com.uplooking.bigdata.common.utils.MapReduceJobUtil;
import com.uplooking.bigdata.mr.duplication.DuplicationJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import java.io.IOException;

public class SortJob {

    /**
     * 驅動程序,使用工具類來生成job
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {

        if (args == null || args.length < 2) {
            System.err.println("Parameter Errors! Usages:<inputpath> <outputpath>");
            System.exit(-1);
        }

        Job job = MapReduceJobUtil.buildJob(new Configuration(),
                SortJob.class,
                args[0],
                TextInputFormat.class,
                SortMapper.class,
                IntWritable.class,
                NullWritable.class,
                new Path(args[1]),
                TextOutputFormat.class,
                SortReducer.class,
                IntWritable.class,
                NullWritable.class);

        // ReduceTask必須設置為1
        job.setNumReduceTasks(1);
        job.waitForCompletion(true);
    }

    /**
     * 數據在Map之後會做sort(從內存緩沖區到磁盤的時候會做sort),所以Map操作只需要把數據直接寫出即可,最後在本地做數據
     * 合並的時候也是會有排序的,詳細可以參考MapReduce的過程,但是需要註意的是,因為我們需要進行的是數字的排序,
     * 所以在Map輸出時,key的類型應該是Int類型才能按照數字的方式進行排序,如果是Text文本的話,那麽是按照字典順序
     * 來進行排序的(也就是先比較字符串中的第一個字符,如果相同再比較第二個字符,以此類推),而不是按照數字進行排序
     */
    public static class SortMapper extends Mapper<LongWritable, Text, IntWritable, NullWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // 先將value轉換為數字
            int num = Integer.valueOf(value.toString());
            // 直接寫出數據到context中
            context.write(new IntWritable(num), NullWritable.get());
        }
    }

    /**
     * 需要註意的是,排序與其它MapReduce程序有所不同,最後在驅動程序設置ReduceTask時,必須要設置為1
     * 這樣才能把數據都匯總到一起,另外一點,數據在shuffle到達reducer的時候,從內存緩沖區寫到磁盤時
     * 也會進行排序操作,所以即便是從不同節點上的Map上shuffle來的數據,到輸入到reducer時,數據也是有序的,
     * 所以Reducer需要做的是把數據直接寫到context中就可以了
     */
    public static class SortReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {
        @Override
        protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context)
                throws IOException, InterruptedException {
            // 直接將數據寫入到context中
            context.write(key, NullWritable.get());
        }
    }
    /**
     * 仍然需要說明的是,因為reduce端在shuffle數據寫入到磁盤的時候已經完成了排序,
     * 而這個排序的操作不是在reducer的輸出中完成的,這也就意味著,reducer的輸出數據中的key數據類型,
     * 可以是IntWritable,顯然也可以設置為Text的,說明這個問題主要是要理清map-shuffle-reduce的過程
     */
}

測試

這裏使用本地環境來運行MapReduce程序,輸入的參數如下:

/Users/yeyonghao/data/input/sort /Users/yeyonghao/data/output/mr/sort

也可以將其打包成jar包,然後上傳到Hadoop環境中運行。

運行程序後,查看輸出結果如下:

yeyonghao@yeyonghaodeMacBook-Pro:~/data/output/mr/sort$ cat part-r-00000
2
6
15
22
26
32
54
92
650
654
756
5956
65223

可以看到,我們的MapReduce已經完成了數據排序的操作。

註意事項

因為在map輸出後,相同的key會被shuffle到同一個reducer中,所以這個過程其實也完成了去重的操作,這也就意味著,按照上面的MapReduce程序的思路,重復的數據也會被刪除,那麽如何解決這個問題呢?大家可以思考一下。

思路也比較簡單,可以這樣做,map輸出的時候,key還是原來的key,而value不再是NullWritalbe,而是跟key一樣的,這樣到了reducer的時候,如果有相同的數據,輸入的數據就類似於<32, [32, 32, 32]>,那麽在reducer輸出數據的時候,就可以叠代[32, 32, 32]數據進行輸出,這樣就可以避免shuffle階段key去重所帶來去除了相同數字的問題。

MapReduce程序之數據排序