1. 程式人生 > >mapreduce 的二次排序

mapreduce 的二次排序

大數據 hadoop 二次排序 mapreduce

  • 一: 理解二次排序的功能, 使用自己理解的方式表達(包括自定義數據類型,分區,分組,排序)
  • 二: 編寫實現二次排序功能, 提供源碼文件。
  • 三:理解mapreduce join 的幾種 方式,編碼實現reduce join,提供源代碼,說出思路。

一: 二次排序 使用自己理解的方式表達(包括自定義數據類型,分區,分組,排序)

1.1 二次排序的功能

   1. 當客戶端提交一個作業的時候,hadoop 會開啟yarn 接受進行數據拷貝處理,之後交友有yarn 框架上的啟動服務resourcemanager 接收,同時指派任務給nomanager ,nodemanger 會調用開 applicationmaster 處理任務,同時在 container 分配好要處理任務環境的抽象,封裝了CPU、內存等多維資源以及環境變量、啟動命令等任務運行相關的信息.之後輸入數據,在輸入數據進行數據inputspilt分割,人很掉用mapper基類將數據分割成,key-values鍵值對之後調用map()方法,調用該方法後會對keys-values 對分割,之後經過shuffle 過程map 的輸出,就是reduce 端的輸入 經過reduce段數據即可輸出到hdfs 上面。 二次排序 就是首先按照第一字段排序,然後再對第一字段相同的行按照第二字段排序。
   2. 在shuffle 過程中,會對數據進行分割(spilt),分區(partitioner),排序(sort),合並(combine),壓縮(compress),分組(group) 之後輸出到reduce端。

1.2 shuffle 對job 格式定義:

      1) partitioner
                job.setPartitionerClass(FirstPartitioner.class);
       2) sort
                job.setSortComparatorClass(cls);

     3) combine
            job.setCombinerClass(cls);
     4) compress
         set by configuration
     5) group
    job.setGroupingComparatorClass(FirstGroupingComparator.class);

二: 編寫實現二次排序功能, 提供源碼文件。

2.1 二次排序格式要求

  1. 利用mapreduce 默認會對key 進行排序的方法對job 進行第一次排序
  2. 把key和需要排序的第二個字段進行組合

2.2 二次排序Java的代碼

SecoundarySortMapReduce.java

package org.apache.hadoop.studyhadoop.sort;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 
 * @author zhangyy
 *
 */
public class SecondarySortMapReduce extends Configured implements Tool{

    // step 1: mapper class
    /**
     * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     */
    public static class SecondarySortMapper extends //
        Mapper<LongWritable,Text,PairWritable,IntWritable>{

        private PairWritable mapOutputKey = new PairWritable() ;
        private IntWritable mapOutputValue = new IntWritable() ;

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // line value
            String lineValue = value.toString();
            // split
            String[] strs = lineValue.split(",") ;
            // invalidate
            if(2 != strs.length){
                return ;
            }

            // set map  output key and value
            mapOutputKey.set(strs[0], Integer.valueOf(strs[1]));
            mapOutputValue.set(Integer.valueOf(strs[1]));

            // output
            context.write(mapOutputKey, mapOutputValue);
        }
    }

    // step 2: reducer class
    /**
     * public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
     */
    public static class SecondarySortReducer extends //
        Reducer<PairWritable,IntWritable,Text,IntWritable>{
        private Text outputKey = new Text() ;

        @Override
        public void reduce(PairWritable key, Iterable<IntWritable> values,
                Context context)
                throws IOException, InterruptedException {
            // set output key
            outputKey.set(key.getFirst());

            // iterator
            for(IntWritable value : values){
                // output
                context.write(outputKey, value);
            }
        }
    }

    // step 3: driver
    public int run(String[] args) throws Exception {
        // 1: get configuration
        Configuration configuration = super.getConf() ;

        // 2: create job
        Job job = Job.getInstance(//
            configuration, //
            this.getClass().getSimpleName()//
        );
        job.setJarByClass(this.getClass());

        // 3: set job
        // input  -> map  -> reduce -> output
        // 3.1: input
        Path inPath = new Path(args[0]) ;
        FileInputFormat.addInputPath(job, inPath);

        // 3.2: mapper
        job.setMapperClass(SecondarySortMapper.class);
        job.setMapOutputKeyClass(PairWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

// ===========================Shuffle======================================     
        // 1) partitioner
                job.setPartitionerClass(FirstPartitioner.class);
        // 2) sort
//              job.setSortComparatorClass(cls);
        // 3) combine
//              job.setCombinerClass(cls);
        // 4) compress
            // set by configuration
        // 5) group
        job.setGroupingComparatorClass(FirstGroupingComparator.class);
// ===========================Shuffle======================================     

        // 3.3: reducer
        job.setReducerClass(SecondarySortReducer.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);
        // set reducer number
        job.setNumReduceTasks(2);

        // 3.4: output
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);

        // 4: submit job 
        boolean isSuccess = job.waitForCompletion(true);

        return isSuccess ? 0 : 1 ;
    }

    public static void main(String[] args) throws Exception {
        args = new String[]{
                "hdfs://namenode01.hadoop.com:8020/input/sort" ,//
                "hdfs://namenode01.hadoop.com:8020/output"
            };      

        // create configuration
        Configuration configuration = new Configuration();

        // run job
        int status = ToolRunner.run(//
            configuration, //
            new SecondarySortMapReduce(), //
            args
        ) ;

        // exit program
        System.exit(status);
    }
}

PairWritable.java

package org.apache.hadoop.studyhadoop.sort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class PairWritable implements WritableComparable<PairWritable> {

    private String first;
    private int second;

    public PairWritable() {
    }

    public PairWritable(String first, int second) {
        this.set(first, second);
    }

    public void set(String first, int second) {
        this.first = first;
        this.setSecond(second);
    }

    public String getFirst() {
        return first;
    }

    public void setFirst(String first) {
        this.first = first;
    }

    public int getSecond() {
        return second - Integer.MAX_VALUE;
    }

    public void setSecond(int second) {
        this.second = second + Integer.MAX_VALUE;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(first);
        out.writeInt(second);
    }

    public void readFields(DataInput in) throws IOException {
        this.first = in.readUTF();
        this.second = in.readInt();
    }

    public int compareTo(PairWritable o) {
        // compare first
        int comp =this.first.compareTo(o.getFirst()) ;

        // eqauls
        if(0 != comp){
            return comp ;
        }

        // compare
        return Integer.valueOf(this.getSecond()).compareTo(Integer.valueOf(o.getSecond())) ;
    }

}
FirstPartitioner.java

package org.apache.hadoop.studyhadoop.sort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class FirstPartitioner extends Partitioner<PairWritable,IntWritable> {

    @Override
    public int getPartition(PairWritable key, IntWritable value,
            int numPartitions) {
        return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }

}

FirstGroupingComparator.java

package org.apache.hadoop.studyhadoop.sort;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;

public class FirstGroupingComparator implements RawComparator<PairWritable> {

    // object compare
    public int compare(PairWritable o1, PairWritable o2) {
        return o1.getFirst().compareTo(o2.getFirst());
    }

    // bytes compare
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
        return WritableComparator.compareBytes(b1, 0, l1 - 4, b2, 0, l2 - 4);
    }

}

2.3 輸出測試

上傳數據處理:
 hdfs dfs -put sort /input

運行輸出:

技術分享圖片
技術分享圖片

技術分享圖片

三:理解mapreduce join 的幾種 方式,編碼實現reduce join,提供源代碼,說出思路。

3.1 mapreduce join 有三種:

   3.1.1 map 的端的join 
   map階段不能獲取所有需要的join字段,即:同一個key對應的字段可能位於不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的數據傳輸。
Map side join是針對以下場景進行的優化:兩個待連接表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到內存中。這樣,我們可以將小表復制多份,讓每個map task內存中存在一份(比如存放到hash table中),然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查找是否有相同的key的記錄,如果有,則連接後輸出即可。
為了支持文件的復制,Hadoop提供了一個類DistributedCache 去實現。
   3.1.2 reduce 的端的join 
   在map階段,map函數同時讀取兩個文件File1和File2,為了區分兩種來源的key/value數據對,對每條數據打一個標簽(tag),比如:tag=0表示來自文件File1,tag=2表示來自文件File2。即:map階段的主要任務是對不同文件中的數據打標簽。
在reduce階段,reduce函數獲取key相同的來自File1和File2文件的value list, 然後對於同一個key,對File1和File2中的數據進行join(笛卡爾乘積)。即:reduce階段進行實際的連接操作
   3.1.3 SemiJoin
   SemiJoin,也叫半連接,是從分布式數據庫中借鑒過來的方法。它的產生動機是:對於reduce side join,跨機器的數據傳輸量非常大,這成了join操作的一個瓶頸,如果能夠在map端過濾掉不會參加join操作的數據,則可以大大節省網絡IO。
實現方法很簡單:選取一個小表,假設是File1,將其參與join的key抽取出來,保存到文件File3中,File3文件一般很小,可以放到內存中。在map階段,使用DistributedCache將File3復制到各個TaskTracker上,然後將File2中不在File3中的key對應的記錄過濾掉,剩下的reduce階段的工作與reduce side join相同

3.2 編程代碼:

DataJoinMapReduce.java


DataJoinMapReduce.java

package org.apache.hadoop.studyhadoop.join;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 
 * @author zhangyy
 * 
 */
public class DataJoinMapReduce extends Configured implements Tool {
    // step 1 : mapper
    /**
     * public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
     */
    public static class WordCountMapper extends //
            Mapper<LongWritable, Text, LongWritable, DataJoinWritable> {

        private LongWritable mapOutputKey = new LongWritable();
        private DataJoinWritable mapOutputValue = new DataJoinWritable();

        @Override
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // split
            String[] strs = value.toString().split(",");

            // invalidate
            if ((3 != strs.length) && (4 != strs.length)) {
                return;
            }

            // set mapoutput key
            Long cid = Long.valueOf(strs[0]);
            mapOutputKey.set(cid);

            // set name
            String name = strs[1];

            // customer
            if (3 == strs.length) {
                String phone = strs[2];
                mapOutputValue.set("customer", name + "," + phone);
            }
            // order
            if (4 == strs.length) {
                String price = strs[2];
                String date = strs[3];
                mapOutputValue.set("order", name + "," + price + "," + date);
            }

            context.write(mapOutputKey, mapOutputValue);
        }

    }

    // step 2 : reducer
    public static class WordCountReducer extends //
            Reducer<LongWritable, DataJoinWritable, NullWritable, Text> {

        private Text outputValue = new Text();

        @Override
        public void reduce(LongWritable key, Iterable<DataJoinWritable> values,
                Context context) throws IOException, InterruptedException {

            String customerInfo = new String();
            List<String> orderList = new ArrayList<String>();

            for (DataJoinWritable value : values) {
                if ("customer".equals(value.getTag())) {
                    customerInfo = value.getData();
                } else if ("order".equals(value.getTag())) {
                    orderList.add(value.getData());
                }
            }

            for (String order : orderList) {
                outputValue.set(key.toString() + "," + customerInfo + ","
                        + order);
                context.write(NullWritable.get(), outputValue);
            }

        }
    }

    // step 3 : job

    public int run(String[] args) throws Exception {

        // 1 : get configuration
        Configuration configuration = super.getConf();

        // 2 : create job
        Job job = Job.getInstance(//
                configuration,//
                this.getClass().getSimpleName());
        job.setJarByClass(DataJoinMapReduce.class);

        // job.setNumReduceTasks(tasks);

        // 3 : set job
        // input --> map --> reduce --> output
        // 3.1 : input
        Path inPath = new Path(args[0]);
        FileInputFormat.addInputPath(job, inPath);

        // 3.2 : mapper
        job.setMapperClass(WordCountMapper.class);
        // TODO
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(DataJoinWritable.class);

        // ====================shuffle==========================
        // 1: partition
        // job.setPartitionerClass(cls);
        // 2: sort
        // job.setSortComparatorClass(cls);
        // 3: combine
        // job.setCombinerClass(cls);
        // 4: compress
        // set by configuration
        // 5 : group
        // job.setGroupingComparatorClass(cls);

        // ====================shuffle==========================

        // 3.3 : reducer
        job.setReducerClass(WordCountReducer.class);
        // TODO
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        // 3.4 : output
        Path outPath = new Path(args[1]);
        FileOutputFormat.setOutputPath(job, outPath);

        // 4 : submit job
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess ? 0 : 1;

    }

    public static void main(String[] args) throws Exception {

        args = new String[] {
                "hdfs://namenode01.hadoop.com:8020/join",
                "hdfs://namenode01.hadoop.com:8020/output3/" 
                };

        // get configuration
        Configuration configuration = new Configuration();

        // configuration.set(name, value);

        // run job
        int status = ToolRunner.run(//
                configuration,//
                new DataJoinMapReduce(),//
                args);

        // exit program
        System.exit(status);
    }

}
DataJoinWritable.java

package org.apache.hadoop.studyhadoop.join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class DataJoinWritable implements Writable {

    private String tag ;
    private String data ;

    public DataJoinWritable() {

    }

    public DataJoinWritable(String tag, String data) {
        this.set(tag, data); 
    }
    public void set(String tag, String data) {
        this.setTag(tag);
        this.setData(data);
    }
    public String getTag() {
        return tag;
    }

    public void setTag(String tag) {
        this.tag = tag;
    }

    public String getData() {
        return data;
    }

    public void setData(String data) {
        this.data = data;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((data == null) ? 0 : data.hashCode());
        result = prime * result + ((tag == null) ? 0 : tag.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        DataJoinWritable other = (DataJoinWritable) obj;
        if (data == null) {
            if (other.data != null)
                return false;
        } else if (!data.equals(other.data))
            return false;
        if (tag == null) {
            if (other.tag != null)
                return false;
        } else if (!tag.equals(other.tag))
            return false;
        return true;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.getTag());
        out.writeUTF(this.getData());
    }

    public void readFields(DataInput in) throws IOException {
        this.setTag(in.readUTF());
        this.setData(in.readUTF());
    }

    @Override
    public String toString() {
        return tag + "," + data ;
    }

}

3.3 運行代碼測試

上傳文件:
hdfs dfs -put customers.txt /join 
hdfs dfs -put orders.txt /join

運行結果:

技術分享圖片
技術分享圖片
技術分享圖片

mapreduce 的二次排序