1. 程式人生 > 實用技巧 >Hadoop基礎之MapReduce中Shuffle

Hadoop基礎之MapReduce中Shuffle

Hadoop基礎之MapReduce中Shuffle

Hadoop基礎之MapReduce中Shuffle

shuffle

  • maptask:map端可以執行的一個程序
  • reducetask:reduce端可以執行的一個程序
  • shuffle是介於maptask和reducetask之間的一個過程
  • shuffle可以分為map端的shuffle和reduce端的shuffle

執行過程

  • shuffle是MR處理中的一個過程,它的每一個處理步驟是分散在各個maptask和reducetask節點上完成的
  • 每個map讀取原資料的一部分(inputSplit),執行Mapper操作
  • 從Mapper端輸出的鍵值對資料進入到環形緩衝區(100M)
  • 環形緩衝區容量達到80%產生溢寫,寫入到磁碟緩衝區
  • 在磁碟緩衝區中分割槽(partitioner),排序(sort),合併(combiner)
  • 由reducetask向maptask拉取資料,進行分組排序(歸併演算法)
  • 把資料組合成(key,序列)傳送到Reducer類中執行
  • 在Reducer類中處理資料,把結果寫到檔案中

分割槽(paratitioner)

    • 根據不同的業務邏輯需求進行不同的資料分割槽,分割槽的數量與reduce數量有關
    • 分割槽數量與reduce數量的關係
      • 分割槽數量 = reduce數量
        • 每個reduce找到自己相應的分割槽
      • 分割槽數量 < reduce數量
        • reduce的數量不夠分配每個分割槽的內容,執行時會報錯
      • 分割槽數量 > reduce數量
        • 每個reduce找到自己的分割槽,但是會有剩餘reduce,浪費資源
      • 特殊情況:N個分割槽對應一個reduce
        • 所有分割槽的內容都在這個reduce處理,相當於梅分割槽
    • 設定分割槽數量
    job.setNumReduceTasks((Integer)object);
    • 預設分割槽方式
      • HashPartitioner extends Partitione<key,value>
    • 重寫分割槽方式
      • getPartitioner(key,value,numReduceTask)
    • 通過key的hashCode值和int的最大值進行與運算,對reduce數量(分割槽)取餘,餘數就是當前這個資料所在分割槽的編號
    • 自定義分割槽方式
      • 自定義類繼承Partitioner<key,value>
      • 重寫getPartition方法,定義分割槽規則
      • 規則裡,每種條件的返回值(一個整型數字)作為分割槽的編號
      • 分割槽數量在這個類中是一個確定值,要小於等於reduce數量,儘量保持一致
      • 在Driver中設定reduce的數量和要使用的自定義分割槽類
job.setPartitionerClass((Class<? extends Partitioner>) object);

排序

    • 在執行完的MapReduce結果中,key是按照自然順序(預設順序)排序的,而value不進行排序
    • 預設排序規則
      • 原有資料型別實現WritableComparable介面,重寫compareTo方法
      • 在方法中定義預設規則,升序降序方式按照返回值是正數還是負數
      • 自定義資料型別預設排序
        • 在自定義資料型別中,如果需要充當Key的泛型,必須實現Comparable介面
        • 重寫compareTo方法,在方法內定義預設排序規則
    • 自定義排序規則
      • 如果MR本身提供的資料型別或自定義序列化型別的預設排序規則無法滿足某個業務邏輯的需求的時候,需要自定義排序規則
      • 自定義排序規則值對當前的業務邏輯有效,不影響其他使用此型別的業務
      • 繼承WritableComparator類,重寫compare方法,使用構造器註冊
    public className (){
        super(排序型別.classtrue);
    }
    • 實現WritableComparable介面和繼承WritableComparator的區別
      • WritableComparable用於序列化型別的預設排序
      • WritableComparator用於序列化型別的自定義排序

      • WritableComparable是實現implements介面,有比較的資料泛型
      • WritableComparator是extends類,沒有比較的資料泛型

      • WritableComparable重寫comparaTo方法,一個引數
      • WritableComparator重寫compare方法,兩個引數

      • WritableComparable無需註冊,寫在自定義序列化型別內部,無需呼叫
      • WritableCompaarator為單獨定義類,通過構造方法註冊自己
    • 在Driver中設定自定義排序規則
    job.setSortComparatorClass((Class<? extends RawComparator>) object);

隨機抽樣排序

    • 部分排序
      • 在結果檔案中,每個檔案內的資料有序,但是全域性檔案無需,MR中正常執行的程式碼都是部分排序
    • 全排序
      • 對於所有結果檔案來說,不僅僅每個檔案內部有序,而且全域性有序
    • 如果資料中某部分特別集中,而且分割槽的標準特別難確定,就會造成reduce端計算的資料傾斜問題,導致計算時間過長

    • 隨機抽樣
      • 設定分割槽類
    job.setPartitionerClass(TotalOrderPartitioner.class);

      • 使用隨機抽樣獲取臨界點
        • 第一個引數:每個值被抽取到的概率
        • 第二個引數:抽取到的樣本數量
        • 第三個引數:切片的數量
    InputSampler.Random randomSampler = new InputSampler.RandomSampler((double)para[0],(int)para[1],(int)para[2]);
      • 把獲取的臨界點寫入partition.lst檔案
    InputSampler.writePartitionFile(job,randomSampler);
      • 如果有已知的臨界點序列化檔案,可以直接使用此檔案
    TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path((String)samplePara));
      • 輸入路徑要在執行隨機抽樣之前
      • 原始檔必須是序列化檔案才可以進行抽樣
    //設定檔案型別為序列化檔案
    job.setInputFormatClass(SequenceFileInputFormat.class);
    • Mapper端的KeyIn和ValueIn根據實際情況填寫
    • 序列化檔案每行都有key和value,讀取的時候可以按照key和value的型別讀取
    • 可以通過hdfs dfs -text xxx.seq(windows端 hdfs.cmd dfs -text xxx.seq)

二次排序

    • 在一個排序條件無法滿足的時候,可以選擇兩個或以上的條件同時排序
      • 預設提供的序列化型別可以在預設排序規則裡進行二次排序
      • 自定義序列化型別可以在預設排序規則和自定義排序規則裡進行二次排序
package mapReduce.join.mapJoin;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ComputerSalesOrder implements WritableComparable<ComputerSalesOrder> {
    private String serialNumber = "";//流水號
    private String product = ""; //商品編號
    private Integer money = 0; //商品價格
    private String time = ""; //成交時間
    private String brand = ""; //品牌
    private String type = ""; //商品型別
    private String flag = ""; //設定標識
    public String getSerialNumber() {
        return serialNumber;
    }

    public void setSerialNumber(String serialNumber) {
        this.serialNumber = serialNumber;
    }

    public String getProduct() {
        return product;
    }

    public void setProduct(String product) {
        this.product = product;
    }

    public Integer getMoney() {
        return money;
    }

    public void setMoney(Integer money) {
        this.money = money;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }

    public String getBrand() {
        return brand;
    }

    public void setBrand(String brand) {
        this.brand = brand;
    }

    public String getType() {
        return type;
    }

    public void setType(String type) {
        this.type = type;
    }

    public String getFlag() {
        return flag;
    }

    public void setFlag(String flag) {
        this.flag = flag;
    }

    @Override
    public int compareTo(ComputerSalesOrder computerSalesOrder) {
        //商品編號為排序第一序列
        if(this.product.equals(computerSalesOrder.getProduct())){
            //商品型別為第二序列
            if(this.type.equals(computerSalesOrder.getType())){
                if(this.time.equals(computerSalesOrder.getTime())){
                    return -(this.money-computerSalesOrder.getMoney());
                    //上述都滿足,按照銷售額倒敘排列
                }else{
                    return this.time.compareTo(computerSalesOrder.getTime());
                }
            }else{
                return this.type.compareTo(computerSalesOrder.getType());
            }
        }else{
            return this.product.compareTo(computerSalesOrder.getProduct());
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(serialNumber);
        out.writeUTF(product);
        out.writeInt(money);
        out.writeUTF(time);
        out.writeUTF(brand);
        out.writeUTF(type);
        out.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.serialNumber = in.readUTF();
        this.product = in.readUTF();
        this.money = in.readInt();
        this.time = in.readUTF();
        this.brand = in.readUTF();
        this.type = in.readUTF();
        this.flag = in.readUTF();
    }


    @Override
    public String toString() {
        return "流水資訊@:" +
                "流水號='" + serialNumber + '\'' +
                ",商品號='" + product + '\'' +
                ",銷售額=" + money +
                ",成交時間='" + time + '\'' +
                ",產品品牌='" + brand + '\'' +
                ",產品型別='" + type;
    }
}

倒排索引

    • 通過具體的內容,反推出這個內容的位置,這種業務關係,叫做倒排索引
    • 獲取當前讀取的檔名
String file = ((FileSplit) context.getInputSplit()).getPath().getName();

Combiner合併

  • Combiner是map端shuffle的一個部分,根據實際的業務邏輯可以使用過也可以不使用
  • 它的實際作用就是在map端執行reduce的操作
  • reduce端可能由於任務分配不均導致工作量巨大,而map端相對輕鬆,屬於map和reduce端的資料傾斜狀態,如果想保持一種均衡的狀態,就需要讓map端幫助reduce端完成部分工作
  • combiner的作用域只有當前map,耳reduce端還需要對所有map資料進行彙總

分組排序

    • 分組:按照某個特徵相同,形成一組
    • MapReduce中的執行位置:reduce端的shuffer
    • MapReduce中使用分組的時間:進入reduce端之前,按照Key相同的,value進行分組
    • 分組依據
      • 預設序列化型別:如果內容完全一致,自然形成一組
      • 自定義序列化型別(多個欄位):根據人為的需求,定義規則,設定分組條件
    • 預設的排序規則就是分組規則
    • 自定義分組規則
      • 分組類繼承WritableComparator類
      • 重寫compare方法
      • 使用構造器註冊
package mapReduce.group;


import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class GroupGSort extends WritableComparator{
    public GroupGSort(){
        super(GroupData.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        GroupData groupDataA = (GroupData)a;
        GroupData groupDataB = (GroupData)b;
        return groupDataA.getRegion().compareTo(groupDataB.getRegion());
    }
}
      • driver設定分組
    job.setGroupingComparatorClass((Class<? extends RawComparator>) object);
  • 排序與分組的區別
    • 排序
      • 排序是按照某種規則,讓資料擁有一定的順序進行顯示
      • 在map端的shuffle執行
      • 在MR中,所有的key必須排序(必須有預設排序規則),value可不排序
      • 預設排序規則往上翻
    • 分組規則是按照某種規則將相同的資料分為一組
      • 分組本質上就是排序,按照排序走,如果排到相同的自然形成一組
      • 在reduce端的shuffle執行
      • 預設的排序規則就是分組規則

join業務

    • join在資料庫中表示多表連線
    • 在MapReduce中可以連線不同檔案的資料
    • 相對於join在資料庫中的實際操作來說,MapReduce中的join更多的是相當於一種思想
    • reduce端:舉個栗子,僅供參考,程式碼只有一部分,我太懶了
package mapReduce.join.reduceJoin;

import mapReduce.common.MRUtils;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ComputerSerialApp {
    public static class ComputerSerialMapper extends Mapper<LongWritable, Text,ComputerSalesOrder, NullWritable>{
        ComputerSalesOrder newKey = new ComputerSalesOrder();
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //獲取資料
            String line = value.toString();
            //拆分
            String[] datas = line.split("\t");
            if(datas.length == 4){
                //長度為4判定為檔案ComputerSalesOrder
                newKey.setSerialNumber(datas[0]);
                newKey.setProduct(datas[1]);
                newKey.setMoney(Integer.parseInt(datas[2]));
                newKey.setTime(datas[3]);
                newKey.setFlag("order");

            }else if (datas.length == 3){
                //長度為3判定為檔案ComputerBrand
                newKey.setProduct(datas[0]);
                newKey.setBrand(datas[1]);
                newKey.setType(datas[2]);
                newKey.setFlag("brand");
            }
            context.write(newKey,NullWritable.get());
        }
    }
    public static class ComputerSerialReducer extends Reducer<ComputerSalesOrder,NullWritable,ComputerSalesOrder,NullWritable>{

        /**
         * ComputerSalesOrder
         * serialNumber         product         money           time        flag
         *
         * ComputerBrand        --僅僅一條
         * product              brand           type            flag
         */

        @Override
        protected void reduce(ComputerSalesOrder key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            List<ComputerSalesOrder> orderList = new ArrayList<>();
            ComputerSalesOrder brand = new ComputerSalesOrder();

            for (NullWritable value : values) {
                String flag = key.getFlag();
                if (flag.equals("order")) {
                    try {
                        ComputerSalesOrder order = new ComputerSalesOrder();
                        BeanUtils.copyProperties(order,key);
                        orderList.add(order);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }else if(flag.equals("brand")){
                    try {
                        BeanUtils.copyProperties(brand,key);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }

            //將名字寫入
            for (ComputerSalesOrder computerSalesOrder : orderList) {
                computerSalesOrder.setBrand(brand.getBrand());
                computerSalesOrder.setType(brand.getType());
                context.write(computerSalesOrder,NullWritable.get());
            }



        }
    }

    public static void main(String[] args) {
        String[] paths = {"E:/input/join/reduceComputerOrderAndBrand","E:/output/join/reduceComputerOrderAndBrand"};
        MRUtils.submit(ComputerSerialApp.class,paths,"sort",ComputerSalesSort.class,"group",ComputerSerialGroup.class/*,KeyValueTextInputFormat.class*/);
    }

}
    • map端:舉個栗子,依然還是懶,懂得都懂
package mapReduce.join.mapJoin;

import mapReduce.common.MRUtils;
import mapReduce.join.reduceJoin.ComputerSalesSort;
import mapReduce.join.reduceJoin.ComputerSerialGroup;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ComputerSerialApp {

    public static class ComputerSerialMapper extends Mapper<LongWritable, Text,Text,Text>{
        Map<String, ArrayList<String>> brandMap = new HashMap();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            String brandName = context.getCacheFiles()[0].getPath().toString();
            System.out.println(brandName+"jhsdhkosdhkoashklasdjklasdhiodhjkladhjklads");
            BufferedReader bufferedReader = new BufferedReader(new FileReader(brandName));
            String line = "";
            while((line = bufferedReader.readLine()) != null){
                try {
                    String[] brand = line.split("\t");
                    ArrayList<String> list = new ArrayList<>();
                    list.add(brand[1]);
                    list.add(brand[2]);
                    brandMap.put(brand[0],list);
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
            bufferedReader.close();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] order = line.split("\t");
            String serialNumber = order[0];
            String product = order[1];
            ArrayList<String> list = brandMap.get(product);
            String brand = list.get(0);
            String type = list.get(1);
            String money = order[2];
            String time = order[3];
            context.write(new Text(serialNumber+"\t"+product+"\t"+brand+"\t"+type+"\t"+money+"\t"+time),new Text(""));
        }
    }


    public static void main(String[] args) {
        String[] paths = {"E:/input/join/reduceComputerOrderAndBrand/ComputerSalesOrder.txt","E:/output/join/mapComputerOrderAndBrand"};
        try {
            Object[] object = new Object[]{new URI("file:///E:/input/join/reduceComputerOrderAndBrand/ComputerBrand.txt"), 0};
            MRUtils.submit(ComputerSalesOrder.class,paths,object);
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

}

並行度(切片)

    • reduce端的並行度
      • 通過setNumReduceTask(n)設定並行度,n為reduce的並行度
      • 每個節點都可以有0-N個reduceTask,根據測試,reduceTask的數量與DataNode的數量保持一致的時候可以達到最快的計算速度
    • map端的並行度
      • map端的並行度與reduce端的並行度毫無關係

    • 切片機制
      • 按照128M的大小對資料進行切片,與block塊保持一致
      • 每一個切片叫做InputSplit
      • 如果輸入路徑有多個檔案,每個檔案分別做切片處理
    • 輸入格式化

輸入格式化

TextInputFormat 最普通的文字文件格式,MapReduce中預設使用的格式型別
SequenceFileInputFormat 序列化檔案格式,檔案中有key和value,分別有各自的型別
KeyValueFileInputFormat 是普通的文字文件,但是每行資料都有KeyValue,型別都是Text
NLineInputFormat 普通的文字文件,打破原有的分片規則,按行分片
CombineTextInputFormat 針對小檔案處理的一種格式,為了避免每個小檔案對應一個InputSplit,導致過多mapTask產生,而每個mapTask中執行的內容特別少
自定義格式化型別

鏈式MR

  • 如果業務邏輯比較複雜的時候,可以使用多個Mapper和Reducer進行鏈式操作
  • 全域性可以有多個Mapper類,但是隻能有一個Reducer類
  • Reducer之前的Mapper類不管有多少個,本質上都屬於一個Mapper
  • Reducer之後的Mapper類,名字是Mapper,但是卻屬於Reducer
  • 本質上當前業務中只有一個Mapper和一個Reducer

計數器

  • 在業務邏輯中,如果需要對立面的一部分資料進行統計數量的時候
  • 分為Map端的計數器和Reduce端的計數器
  • Map端與Reduce端計數器寫法一致,位置和顯示有區別

壓縮

    • 在MR計算過程中,為了節省一定的空間可以使用壓縮機制
    • 壓縮需要消耗時間和資源對檔案重新編碼,屬於用時間換空間的方式
    • 是否壓縮,按照什麼方式壓縮,要根據實際情況進行選擇

壓縮的位置壓縮格式與對應編碼/解碼器壓縮效能對比、

Map前壓縮 Map後壓縮 Reducer之後壓縮
壓縮格式 hadoop自帶? 演算法 副檔名 是否可以切分 壓縮後,原程式是否需要修改
DEFAULT 是,直接使用 DEFAULT .default 和文字處理一致,不需要更改
Gzip 是,直接使用 DEFAULT .gz 和文字處理一致,不需要更改
bzip2 是,直接使用 bzip2 .bz2 和文字處理一致,不需要更改
LZO 否,需要安裝 LZO .lzo 需要建索引,還需要指定輸入格式
Snappy 否,需要安裝 Snappy .snappy 和文字處理一致,不需要更改
壓縮格式 編碼/解碼器
DEFLATE org.apache.io.compress.DefaultCodec
gzip org.apache.io.compress.GzipCodec
bzip2 org.apache.io.compress.BZip2Codec
LZO org.apache.compression.lzo.LzopXodec
Snappy org.apache.io.compress.SnappyCodec
壓縮率:bzip2>gzip>LZO
耗時:bzip2>gzip>LZO