1. 程式人生 > 其它 >MapReduce框架原理--Shuffle機制

MapReduce框架原理--Shuffle機制

Shuffle機制

Mapreduce確保每個reducer的輸入都是按鍵排序的。系統執行排序的過程(Map方法之後,Reduce方法之前的資料處理過程)稱之為Shuffle。

partition分割槽

Partition分割槽流程處於Mapper資料屬於初到環形緩衝區時進行,此時會將通過Partition分割槽獲取到的每一行key-value對應的分割槽值計入環形緩衝流的左。

問題引出

要求將統計結果按照條件輸出到不同檔案中(分割槽)。比如:將統計結果按照手機歸屬地不同省份輸出到不同檔案中(分割槽)

分割槽可以實現將Map階段處理的資料在向環形緩衝區寫入的時候是以"分類"的方式寫的。"一般情況下",MR程式分割槽數有多少,ReduceTask的數量就應該有多少,可以實現一個分割槽的資料一個ReduceTask去處理。ReduceTask處理完成之後都會去生成一個結果檔案

以WordCount為例,設定ReduceTask的數量

job.setNumReduceTasks(2);

MapReduce底層預設分割槽機制

預設使用的是HashPartitioner分割槽機制

@Public
@Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {
public HashPartitioner() {
}

public int getPartition(K key, V value, int numReduceTasks) {
return (key.hashCode
() & 2147483647) % numReduceTasks; // 2147483647
int型別的最大值,numReduceTask即是上面設定的ReduceTask的值
}
}

這種分割槽機制是不可控的,因為它是根據Map階段獲取的key的hashCode值和numReduceTask取餘得來的,但是key的hashCode值不確定,所以把key-value資料分到哪一個區我們是不確定的。比如會出現不同的key的HashCode值一致,導致結果輸出的不可控制。因此我們在去定義分割槽的時候我們最常用的方法就是:自定義分割槽機制

自定義Partition(案例)

自定義Partition步驟:

  1. 繼承Partitioner類

    在繼承Partitioner類時,應該傳遞一個<key, value>的泛型,它代表的是需要進行分割槽的資料,所以需要傳遞的是map階段輸出的key-value型別,因為分割槽是在Map階段執行結束之後往ReduceTask階段在輸出資料時執行的

  2. 重寫裡面的getPartition方法,返回值是一個int型別,返回值就是我們的分割槽。執行邏輯返回一個從0開始的一個連續型數字。比如getPartition返回的取值有0 1 2 3 4時就代表有5個分割槽,0是1號分割槽,對應的檔案是part-r-00000檔案,以此類推。

    要求:

    1. 返回的分割槽數字最好是連續的,比如返回了 0 2 3 4 ,數字不連續,不可行
    2. 一般情況下,有幾個分割槽,就在Driver中指定numReduceTasks就有幾個,不能多寫也不能少寫

    public int getPartition(Text key, Text value, int numReduceTask)方法有三個引數,返回一個int型別的值,它們代表的含義分別是:

@param key:Map階段輸出的key值

@param value:Map階段輸出的value值

@param numReduceTask:定義的ReduceTask的任務數,預設是1

@return 數字,代表的是我要將這條key-value資料輸送到哪個分割槽

  1. 在Driver類中指定分割槽所在的類與分割槽數量

    // 定義不使用預設的HashPartitioner分割槽,而是使用自定義的分割槽
    job.setPartitionerClass(PhoneDataPartition.class);
    //
    指定你的ReduceTask必須是5
    job.setNumReduceTasks(5);

    這裡要求ReduceTask的數量必須和自定義的Partition類中設定的分割槽數量保持一致,原因就是在MR程式中一般預設情況下是一個分割槽要有一個ReduceTask專門去處理。但是在有些情況下,ReduceTask可能少寫或者多寫,這樣會出一些奇怪的問題。

    1. 假設Mapper階段輸出的分割槽5個,但是設定了1個ReduceTask任務去執行,程式可以執行成功,此時在HDFS中生成1個結果檔案。雖然有多個分割槽,但是一個ReduceTask可以處理這多個分割槽。
    2. 但是假設Mapper階段輸出的分割槽5個ReduceTask的數量少於5個,這時程式不能執行,且程式報IO異常的錯誤。(不患寡而患不均)

    java.lang.Exception: java.io.IOException: Illegal partition for 138 (2)

    (這裡可以利用打工人來舉例,假設有五個人一起去打工,但是此時崗位只有一個,那麼這五個人可以一起做著一份工作。但是如果設定了兩個崗位,分配時將會出現矛盾狀況,比如兩個工人都想做一份工作,或者兩個工人都不想做這一個工作,就會產生衝突)

    1. 假設分割槽5個ReduceTask的數量也有5個,那麼百分之百可以正常執行,這也是最佳狀態/最理想狀態(因為每個人都分配到了自己想做的工作)
    2. 假設分割槽5個ReduceTask的數量多於5個,程式也是百分之百可以正常執行,但是會多出一個空白結果檔案

注意】以後在工作中寫的ReduceTask的數量最好和分割槽的數量保持一致,這樣的話才能保證處理出的MR程式處於最佳狀態

案例一:電話歸屬地

  1. 需求:將統計結果按照手機歸屬地不同省份輸出到不同檔案中(分割槽)
  2. 分割槽:

136----分割槽1

137----分割槽2

138----分割槽3

139----分割槽4

其他---分割槽5

預設分割槽機制:5個分割槽,需要設定5個ReduceTask,同時預設分割槽機制是按照key的HashCode值分配的

  1. 程式碼:
    1. PhoneDataMapper.java

    public class PhoneDataMapper extends Mapper<LongWritable, Text, Text, Text> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    String[] fields = line.split("\t");
    String phone = fields[1];
    //
    拿到手機號的前三位
    String phoneThree = phone.substring(0, 3);
    context.write(new Text(phoneThree), value);
    }
    }
    1. PhoneDatePartition.java

    public class PhoneDataPartition extends Partitioner<Text, Text> {
    @Override
    public int getPartition(Text key, Text value, int numReduceTask) {
    String s = key.toString();
    switch (s) {
    case "136":
    return 0;
    case "137":
    return 1;
    case "138":
    return 2;
    case "139":
    return 3;
    default:
    return 4;
    }
    }
    }
    1. PhoneDataReducer.java

    public class PhoneDataReducer extends Reducer<Text, Text, NullWritable, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
    for (Text value : values) {
    context.write(NullWritable.get(), new Text(value));
    }
    }
    }
    1. PhoneDateDriver.java

    public class PhoneDataDriver {
    public static void main(String[] args) throws Exception {
    //
    獲取job
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf);
    //
    設定jar
    job.setJarByClass(PhoneDataDriver.class);
    //
    關聯MapperReducer
    job.setMapperClass(PhoneDataMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setReducerClass(PhoneDataReducer.class);
    job.setOutputKeyClass(NullWritable.class);
    job.setOutputValueClass(Text.class);
    //
    管理檔案的輸入和檔案的輸出
    FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.218.55:9000/school/phone_data.txt"));
    Path output = new Path("hdfs://192.168.218.55:9000/test/school/phone_data");
    FileSystem fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), conf, "root");
    if (fs.exists(output)) {
    fs.delete(output, true);
    }
    /*//
    設定ReduceTask的數量
    job.setNumReduceTasks(5);*/
    //
    定義不使用預設的HashPartitioner分割槽,而是使用自定義的分割槽
    job.setPartitionerClass(PhoneDataPartition.class);
    //
    指定你的ReduceTask必須是5
    job.setNumReduceTasks(5);
    FileOutputFormat.setOutputPath(job, output);
    //
    提交執行
    boolean result = job.waitForCompletion(true);
    System.exit(result ? 0 : 1);
    }
    }
  2. 執行截圖

案例二:單詞計數

  1. 需求:按照單詞首字母的ASCII碼進行奇偶分割槽(Partitioner)
  2. 程式分析

  3. 原始碼

    WordCountPartition.java

public class WordCountPartition extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text text, LongWritable longWritable, int i) {
String word = text.toString();
char first = word.charAt(0);
if (first % 2 == 0 ) {
return 0;
} else {
return 1;
}
}
}
  1. 執行結果

  2. 舉一反三

單詞計數案例:要求根據單詞首字母的大小寫分割槽,如果單詞首字母是大寫,那麼單詞輸出在一個檔案中,如果單詞的首字母是小寫,那麼單詞輸出在另一個分割槽中

原始碼:

public class OtherPartition extends Partitioner<Text, LongWritable> {
@Override
public int getPartition(Text text, LongWritable longWritable, int i) {
String word = text.toString();
char first = word.charAt(0);
if (Character.isUpperCase(first)) {
return 1;
} else {
return 0;
}
}
}

執行結果:

WritableComparable排序

排序概述

  • 排序是MapReduce框架中最重要的操作之一。
  • MapTask和ReduceTask均會對資料按照key進行排序。該操作屬於Hadoop的預設行為。任何應用程式中的資料均會被排序,而不管邏輯上是否被需要
  • 預設排序是按照字典順序排序,且實現該排序的方法是快速排序。
  • 對於MapTask,他會將處理的結果暫時放到環形緩衝區中,當環形緩衝區使用率達到一定閾值後,再對緩衝區中的資料進行一次快速排序,並將這些有序資料溢位到磁碟上,而當這些資料處理完畢後,他會對磁碟上的所有檔案進行歸併排序。
  • 對於ReduceTask,他從每個MapTask上遠端拷貝相應的資料檔案,如果檔案大小超過閾值,則溢寫到磁碟上,否則儲存在記憶體中。如果磁碟上檔案數目達到一定閾值,則進行一次歸併排序以生成一個更大的檔案、如果記憶體最終檔案大小或者數目超過一定閾值,則進行一次合併後將資料溢寫到磁碟上。當所有資料拷貝完成後,ReduceTask統一對記憶體和磁碟上的所有資料進行一次歸併排序

問題在一次MR程式中需要進行幾次排序?在什麼時候進行排序?

解答】會進行三次排序。第一次是在環形緩衝區使用率達到一定閾值(80%)時,會對緩衝區中的資料進行一次快速排序;第二次是當資料處理完畢時,會對磁碟上的所有檔案進行歸併排序;第三次是ReduceTask統一對記憶體和磁碟上的所有資料進行一次歸併排序

排序的分類

部分排序全排序的區別:全排序是一個結果檔案,部分排序是多個結果檔案。在實際程式設計中也就是添加了自定義排序和設定NumReduceTask()

要想實現使自己的資料充當key值進行排序,必須實現WritableComparable介面的方法。

全排序案例

  1. 需求:

    將單詞計數後的結果按照數量升序排序

  2. 原始碼:
    1. WordCountBean.java

    public class WordCountBean implements WritableComparable<WordCountBean> {
    private String word;
    private int count;

    public String getWord() {
    return word;
    }

    public void setWord(String word) {
    this.word = word;
    }

    public int getCount() {
    return count;
    }

    public void setCount(int count) {
    this.count = count;
    }

    @Override
    public String toString() {
    return word + " " + count;
    }

    /**
    *
    比較器預設大於或者小於時返回一個非零的數,如果等於則返回0,代表兩個物件一模一樣
    * MR程式比較大小的時候,千萬不能返回0.如果返回0,代表兩個物件一樣,那麼相當於認為key是一樣的,此時其餘的相等資料將顯示不出來
    * @param o
    * @return
    */
    @Override
    public int compareTo(WordCountBean o) {
    if (this.count > o.getCount()) {
    return 1;
    } else {
    return -1;
    }
    }


    @Override
    public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeUTF(this.word);
    dataOutput.writeInt(this.count);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
    this.word = dataInput.readUTF();
    this.count = dataInput.readInt();
    }
    }
  3. 執行截圖

    部分排序案例

部分排序只是相對於全排序多了自定義分割槽以及設定ReduceTask的數量

  1. 需求

    根據手機號碼前三位進行分割槽,前三位為"135",則位於第一分割槽;前三位為"136",則位於第二分割槽;前三位為"137",則位於第三分割槽,其餘資料在第四分割槽,並且按照總流量數升序排列

  2. 原始碼
    1. FlowBean.java

public class FlowBean implements WritableComparable<FlowBean> {
private String phone;
private int upFlow;
private int downFlow;
private int sumFlow;

public String getPhone() {
return phone;
}

public void setPhone(String phone) {
this.phone = phone;
}

public int getUpFlow() {
return upFlow;
}

public void setUpFlow(int upFlow) {
this.upFlow = upFlow;
}

public int getDownFlow() {
return downFlow;
}

public void setDownFlow(int downFlow) {
this.downFlow = downFlow;
}

public int getSumFlow() {
return sumFlow;
}

public void setSumFlow(int sumFlow) {
this.sumFlow = sumFlow;
}

@Override
public String toString() {
return phone + " " + upFlow + " " + downFlow + " " + sumFlow;
}

/**
*
序列化方法:將Java物件的屬性值怎麼序列化寫出
* @param dataOutput
* @throws IOException
*/
@Override
public void write(DataOutput dataOutput) throws IOException {
//
將一個String型別的屬性序列化寫出成二進位制資料
dataOutput.writeUTF(this.phone);
//
將一個String型別的屬性序列化寫出成二進位制資料
dataOutput.writeInt(upFlow);
dataOutput.writeInt(downFlow);
dataOutput.writeInt(sumFlow);
}

/**
*
反序列化方法:怎麼將二進位制程式碼轉成JavaBean物件屬性的值
* 反序列化的時候,讀取二進位制資料時,不能隨便讀
* 序列化寫出時先寫出哪個屬性的值,就先讀哪個屬性值
* @param dataInput
* @throws IOException
*/
@Override
public void readFields(DataInput dataInput) throws IOException {
this.phone = dataInput.readUTF();
this.upFlow = dataInput.readInt();
this.downFlow = dataInput.readInt();
this.sumFlow = dataInput.readInt();
}

/**
*
傳進來一個物件,我們需要在這個方法中定義我和傳進來的這個物件誰大誰小的邏輯
* 一般情況下,JavaBean物件去比較大小都是比較屬性值的大小
* 如果返回的是1,那麼代表大於傳進來的物件,升序排序
* 如果返回的是0.那麼代表相等
* 返回的是-1,代表小於傳進來的物件,降序排序
*
* @param o
* @return
*/
@Override
public int compareTo(FlowBean o) {
return Integer.compare(this.sumFlow, o.getSumFlow());
}
}
  1. FlowPartition.java

public class FlowPartition extends Partitioner<FlowBean, NullWritable> {
@Override
public int getPartition(FlowBean flowBean, NullWritable nullWritable, int i) {
String phone = flowBean.getPhone();
String three = phone.substring(0, 3);
switch (three) {
case "135":
return 0;
case "136":
return 1;
case "137":
return 2;
default:
return 3;
}
}
}
  1. 執行截圖

    二次排序案例

  2. 需求

    學生成績排序:先根據語文成績降序排序,如果語文成績相同,再根據英語成績降序排序

  3. 原始碼

    RecordBean.java

    public class RecordBean implements WritableComparable<RecordBean> {
    private String name;
    private int china;
    private int maths;
    private int english;

    public String getName() {
    return name;
    }

    public void setName(String name) {
    this.name = name;
    }

    public int getChina() {
    return china;
    }

    public void setChina(int china) {
    this.china = china;
    }

    public int getMaths() {
    return maths;
    }

    public void setMaths(int maths) {
    this.maths = maths;
    }

    public int getEnglish() {
    return english;
    }

    public void setEnglish(int english) {
    this.english = english;
    }

    @Override
    public String toString() {
    return "name='" + name + '\'' +
    ", china=" + china +
    ", maths=" + maths +
    ", english=" + english;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
    dataOutput.writeUTF(this.name);
    dataOutput.writeInt(china);
    dataOutput.writeInt(maths);
    dataOutput.writeInt(english);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
    this.name = dataInput.readUTF();
    this.china = dataInput.readInt();
    this.maths = dataInput.readInt();
    this.english = dataInput.readInt();
    }

    @Override
    public int compareTo(RecordBean o) {
    if (this.china > o.getChina()) {
    return -1;
    }else if(this.china == o.getChina()) {
    if (this.english > o.getEnglish()) {
    return -1;
    } else {
    return 1;
    }
    }
    else {
    return 1;
    }
    }
    }
  4. 執行截圖

    分組輔助排序

Combiner合併

Combiner基本概念

Combiner合併其實是MapReduce程式中的一個調優策略

如果多個 MapTask對應一個ReduceTask任務時,ReduceTask的壓力會很大,這時可以將ReduceTask的數量增加,這樣每個ReduceTask只需要處理一個MapTask,壓力會小很多。

還有一種方式就是在Mapper階段設定一個Combiner合併操作,每一個Combiner對應一個MapTask任務,它會將MapTask任務先進行一次合併,之後再交由ReduceTask執行。

Combiner是可選操作,他只是MR程式中的一個調優策略。但是分割槽和排序是必選的

Combiner處於Map階段執行之後,Reduce階段執行之前,其中Combiner的輸入是Mapper階段的輸出Key-value,輸出時Reduce階段輸入的key-value

Combiner程式碼實現

  1. 自定義一個combiner繼承Reducer,重寫reduce方法

    public class WordCountCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
    Iterator<LongWritable> iterator = values.iterator();
    long num = 0L;
    while (iterator.hasNext()) {
    LongWritable next = iterator.next();
    long l = next.get();
    num += l;
    }
    context.write(key, new LongWritable(num));
    }
    }

在job驅動類中設定:

job.setCombinerClass(WordCountCombiner.class);
  1. 將reduce當作Combiner元件使用

    通過上述程式碼可以發現,combiner類寫的邏輯方法與Reduce階段的相同(一般情況下兩個的處理邏輯都一樣),所以可以直接在Driver中設定Combiner類為Reduce類。注意:雖然二者程式碼一樣,但是執行的邏輯不一樣。

    在job驅動類中設定

    job.setCombinerClass(WordCountReduce.class);
    

    Combiner總結

  2. Combiner執行時機在Mapper結束之後,Reducer開始之前
  3. Combiner操作可選擇,可以新增也可以不新增,不新增的話Map階段的資料會直接傳給Reducer。如果加上這個操作,那麼Map階段的資料先給Combiner,再從Combiner給Reducer。同時注意,map階段的輸出就是Combiner的輸入,Combiner的輸出就是Reducer的輸入
  4. Combiner和Reducer的邏輯非常像,都是根據key值,將相同的key值的value值集中起來走一次Reducer。不同點在於Combiner對每一個MapTask都會去執行一次,而Reducer是將每一個Maptask的資料全部拉過來再去執行,這樣效率比較低

    GroupingComparator分組(輔助排序)

    分組排序簡介

這個分組排序又叫輔助排序,也是可選的。但是在有些業務條件下,分組排序必須存在。

主要作用:如果我們定義的key是JavaBean物件,我們可以在分組中將部分欄位相同的bean物件當做同一個key處理。

分組排序的時機

map階段資料分割槽排序合併結束,reduce已經將map階段的輸出資料讀取到記憶體中,即將執行Reducer方法之前

分組排序的作用

  1. 將資料進行排序(一般情況下都用WritableComparable)
  2. 判斷滿足什麼條件key值就相等了。
    1. Text型別判斷的就是值,就是相等的key;
    2. LongWritable型別也是判斷值相等,就是相等的key;
    3. 對於自定義的JavaBean物件,如何判斷相等呢?是判斷地址相等嗎?

      補充】

      例如:Student s1 = new Student("zs", "8374772648327498");

      Student s2 = new Student("ls", "8374772648327498");

      這兩個物件相等嗎?

      從程式碼角度而言,這兩個物件不相等,因為他們的地址不相等

      但是Java是面向物件程式設計的語言,面向物件就是拿現實的邏輯去理解程式碼,所以從現實的角度來看,這兩個物件是同一個物件,因為他們的身份證號一樣

      所以需要主要一個問題,在Java中,比較基本資料型別是否相等,應該用 == 去判斷;但是如果需要比較兩個引用資料是否相等,需要通過兩個物件的屬性值是否相等來判斷兩個物件是否為同一個物件,一般使用equals方法和compare方法

      有了以上基礎,再看上面的問題。對於JavaBean物件是否相等的判斷,不應該用地址判斷,我們需要判斷物件中的某些屬性是否相等。所以在MR程式中,如果你想根據我們自己的邏輯將不同的JavaBean物件當做同一個key處理,我們可以通過分組輔助排序實現。當然我們的WritableComparable也可以去實現,但是在業務邏輯上這個介面主要是做排序使用的。如果想在Reducer階段讓不同的Java物件成為一組相同的key,那麼你就需要定義一個分組排序規則,在裡面定義一下哪個物件是相同的key

    分組排序的實現思路

案例:訂單排序

  1. 需求

    有如下訂單資料

訂單id

商品id

成交金額

0000001

Pdt_01

222.8

0000001

Pdt_05

25.8

0000002

Pdt_03

522.8

0000002

Pdt_04

122.4

0000002

Pdt_05

722.4

0000003

Pdt_01

222.8

0000003

Pdt_02

33.8

現在需要求出每一個訂單中最貴的商品。

  1. 思路
    1. 繼承extendsWritableComparator
    2. 定義一個構造器,指定判斷相等的是那個JavaBean物件,並且傳一個true(如果不定義,會報空指標異常的錯誤,因為自定義的比較器預設傳入的值都是null值)
    3. 重寫compare(WritableComparable a, WritableComparable b) 方法,在裡面寫我們比較的邏輯

    注意】這裡的引數都是WritableComparable型別,實際上是Mapper階段輸出的key的型別。這裡可以使用多型的方式對引數進行強轉

    1. 在Driver類中對分組排序類進行配置

      job.setGroupingComparatorClass(OrderGroup.class);
      
  2. 原始碼

    public class OrderGroupComparable extends WritableComparator {
    public OrderGroupComparable() {
    super(Order.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
    Order order1 = (Order) a;
    Order order2 = (Order) b;
    if (order1.getId() > order2.getId()) {
    return -1;
    } else if (order1.getId() == order2.getId()) {
    return 0;
    } else {
    return -1;
    }
    }
    }
  3. 執行截圖