1. 程式人生 > >分組分割槽和資料傾斜combiner

分組分割槽和資料傾斜combiner

reducetask的並行度----分割槽:     yarchild--            1maptask---MyMapper/1reducetask任務----MyReducer     執行reduce任務的並行度     目前情況下  reducetask只有一個     19557 YarnChild    ---   reducetask     預設情況下  reducetask的個數只有1個         一個reducetask只能執行在一個節點上         當資料量很大時候  只有一個reducetask不合理的               1)這個reducetask的壓力很大             2)負載不均衡         reducetask也應該根據我們的實際的資料  設定多個             如何設定:                 程式碼設定:                     job.setNumReduceTasks(tasks);                     引數代表就是reducetask的個數  需要幾個reducetask的時候  設定為幾就可以                     這個引數值  預設為1  預設情況只執行一個reducetask                     job.setNumReduceTask;                     發現   輸出的結果3個檔案   1個標誌檔案                         part-r-00000                         part-r-00001                         part-r-00002                     結論:一個reducetask最終輸出一個對應的結果檔案                     reducetask中的資料如何劃分的:                         預設情況下:                             hash分割資料的                             key.hash%reducetask的個數                             一個reducetask的資料對應的是一個分割槽的資料                             分割槽:對map輸出的資料進行一個按照一定的規則劃分  每一部分稱為一個分割槽                                     partition                                     這裡的分割規則叫分割槽演算法                                 推斷:預設的分割槽演算法:hash演算法  1)雜湊  2)唯一                                                              底層實現:                                 預設的抽象類  Partitioner抽象類   定義分割槽演算法/分割槽規則的                                 public abstract class Partitioner<KEY, VALUE> {                                     //返回值:int    這裡的返回值代表的是分割槽編號 每一個分割槽的唯一標誌 預設從0開始   順序遞增的                                     //引數:引數1-map輸出的key  引數2-map輸出的value   引數3-分割槽個數(job.setNumReduceTask())                                     public abstract int getPartition                                     (KEY key, VALUE value, int numPartitions);                                 }                                 預設呼叫的實現類:HashPartitioner                                     //泛型指的是map輸出的k   v的型別                                     public class HashPartitioner<K, V> extends Partitioner<K, V> {                                         //預設的引數3   numReduceTasks                                       /** Use {@link Object#hashCode()} to partition. */                                       public int getPartition(K key, V value,                                                               int numReduceTasks) {                                         return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;                                       }

                                    }                                     key.hashCode() & Integer.MAX_VALUE   目的:防止溢位  範圍控制integer_max                                        1011001100111010101010101                                                                             &                                                                             111111111111111111111111                                         --------------------------                                         011001100111010101010101                             預設的分割槽演算法   取map的key的hash值%reducetask的個數  獲取返回值就是分割槽編號     總結:預設情況下         一個分割槽----一個reducetask----一個輸出結果檔案             part-r-00001    00001---代表分割槽編號         預設情況下  分割槽演算法  hash分割槽         (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;         預設情況下  分割槽演算法  和 reducetask的個數一一對應的         reducetask的並行度  取決於reducetask的個數   job.setNumReduceTasks         reducetask的資料如何分配  取決於分割槽演算法的  預設的hash分割槽     自定義分割槽:         預設的分割槽演算法  並不能滿足所有的需求   不能自定義資料的去向的         假設:淘寶資料             江蘇             浙江             上海             內蒙             按照地域進行劃分reducetask的資料  預設的就不可以  需要自定義分割槽         寫法:             1)繼承Partitioner類             2)重寫getPartition 方法  這個方法的返回值  代表就是分割槽編號         案例:             將流量彙總統計結果按照手機歸屬地不同省份輸出到不同檔案中             手機號的前三位---手機號歸屬地             輸出到不同的檔案-----不同的reducetask的輸出---不同分割槽             分割槽演算法:按照手機號的歸屬地進行劃分的                 歸屬地不同---不同分割槽--不同reducetask---不同檔案             自定義分割槽:                 分割槽欄位map的key的位置                 按照手機號分割槽                 shuffle             map端:                 key:手機號                 value:剩下的             reduce端:                 輸出                          //指定自定義分割槽         job.setPartitionerClass(MyPartitioner.class);         //指定reducetask的個數         job.setNumReduceTasks(3);         報錯:             分割槽編號   值3   reducetask-3個  redutask0--分割槽0  reducetask1=分割槽1  reducetask2=分割槽2               報錯:                     Illegal partition for 15013685858 (3)                 原因  分割槽和reducetask的個數不匹配             注意:設定分割槽編號的時候  最好順序遞增的  不要跳數             分割槽個數3 個  reducetask的個數可以設定:                 1=====可以   對應一個輸出結果                 2=====不可以                 3可以                 >3  可以                 reducetask的個數  為1 的時候   所有的分割槽的資料  預設全部到一個reducetask中                 reducetask的個數大於1   按照分割槽編號  對應的分割槽到對應的reducetask中                 設定的時候reducetask的個數=分割槽個數     分割槽演算法:             預設分割槽:key.hash%reducetask的個數         自定義分割槽:自己定義的         分割槽演算法決定map輸出的資料  如何分配給reduce的           資料傾斜:     多個reducetask平行計算的時候  某一個reducetask分配資料不均勻   這個時候產生現象資料傾斜     後果:reducetask的整體執行的效能低     100reducetask     99reducetask--1T   10min     1reducetask-----100T  1000min     進度:         map  100%  reduce  99%         map  100%  reduce  99%         map  100%  reduce  99%         map  100%  reduce  99%         map  100%  reduce  99%         map  100%  reduce  99%         map  100%  reduce  99%         map  100%  reduce  99%         ...............     大資料中不怕資料量大  怕資料傾斜     很快reducetask達到90%  卡     資料傾斜一定需要儘量避免的     原因:分割槽演算法沒有合理的分配均勻資料     解決:合理設計分割槽演算法  根據實際的資料抽樣做測試  進行合理設計分割槽  業務、資料     mapreduce階段:         maptask:             1個切片預設128M---1maptask             1)maptask的並行度比較高                 切片個數---資料塊的個數(最後一個切片有可能跨塊的)             2)maptask資料一個切片  128M  對應的資料量比較小         reducetask:             容易產生資料傾斜             1)並行度不高                 job.setNumReduceTasks()                 經驗值:reducetask最大值  datanode*0.95  100---95             2)reducetask的資料分配取決於分割槽演算法               3)reducetask的對應的資料量本身比較大 combiner元件:     這個元件不適用所有的場景的     優化元件:提升效能的作用  這個元件預設不加的     作用:對到reducetask之前的資料做預處理  幫助reducetask做預處理  減少reducetask的資料量  提升效能     預設沒有元件:         怎麼加?         這個元件就是幫助reducetask做事情  減輕redcetask的壓力             提前幫助reducetask做一些自己的工作             combiner業務邏輯和reducetask的邏輯一樣  作用的時間點在maptask之後  reducetask之前         怎麼實現?             這個元件不會影響業務邏輯             和reducetask的實現是一樣             1)繼承Reducer類             2)重寫reduce方法             3)job中設定新增combiner元件             job.setCombinerClass(MyCombiner.class);         實現 和reducetask的實現一樣 實際的開發過程直接使用reducer的類作為combiner的類                  combiner的可以適用:             求和             求最大值             求最小值         Combiner不使用場景:             求平均值     注意:這個元件使用的時候  慎重考慮 分組:     map----shuffle-----reduce     shuffle:分割槽----排序----分組     分組到底是怎麼分組的:按照map的key進行分組的         預設的型別:             wc:             000000000000000000000000000             hadoop----------------64             hadoop----------------64             hadoop----------------64             hadoop----------------64             000000000000000000000000000             hadoophello----------------1             hadoophello----------------1             hadoophello----------------1             hadoophello----------------1             相同的key為一組             預設的是key一直隨著value指標變化而變化的 只不過預設的key都是一樣的          自定義的型別:             111111111111111111111111111111             computer    huangzitao    72.42857142857143-----------(null)             computer    huangxiaoming    72.42857142857143-----------(null)             computer    huangzitao    72.42857142857143-----------(null)             computer    huangxiaoming    72.42857142857143-----------(null)             預設的分組按照自定義型別的comparaTo的方法進行分組的             現根據科目  再根據  成績             將相同科目  和  相同分數的人劃分到一組                          111111111111111111111111111111computer    huangzitao    72.42857142857143   //這一組中的第一個key             map:context.write(sb,null)             computer    huangzitao    72.42857142857143-----------(null)             computer    huangxiaoming    72.42857142857143-----------(null)             computer    huangzitao    72.42857142857143-----------(null)             computer    huangxiaoming    72.42857142857143-----------(null)                          reduce:     預設分組:呼叫的就是所有型別ComparaTo方法---排序的方法     有的需求中 分組和排序如果有衝突,怎麼辦?     案例:         3、求出每門課程參考學生平均成績top3的學生的資訊:課程,姓名和平均分

            課程   姓名    成績             computer    liujialing    98             computer    huangbo        96             computer    liutao        95             math        huangdatou    99             math        liuyifei    98             math        liutao        95             map的輸出的key:自定義型別(課程  姓名   成績)             排序:按照成績             分組:課程                          comparaTo(){                 先按課程                 成績             }                          實際的上的分組:comparaTo()                 按照:課程+成績             每個課程的前三  保證reduce一次性可以接受的資料是同一門課程的             排序  和  分組  衝突了             分組不能使用剛才的排序的規則的了             排序:按照成績             分組:課程             自定義分組:                 分組:課程                 分組的底層  對map輸出所有的key進行比較  相同的(return 0)key 認為一組                 比較的過程中  返回的!=0   認為不是同一組                 writableComparator  分組規則的  普通類                                  預設的分組的比較:                   @SuppressWarnings("unchecked")                  //map輸出的key-----比較  序列化   WritableComparable                 //引數:WritableComparable                 //分組針對map輸出的key                 public int compare(WritableComparable a, WritableComparable b) {                 //預設的分組就是呼叫  自定義的comparaTo                     return a.compareTo(b);                 }                                  自定義分組:重寫compare                     1)繼承writableComparator                     2)重寫compare()                     3)job中指定                     job.setGroupingComparatorClass(MyGroup.class);                                          報錯:                     java.lang.Exception: java.lang.NullPointerException                     空指標異常的錯誤:                     compare()  兩個引數的物件預設不會幫我們建立的                                                                /** Construct for a {@link WritableComparable} implementation. */                       protected WritableComparator(Class<? extends WritableComparable> keyClass) {                         this(keyClass, null, false);                       }

                      protected WritableComparator(Class<? extends WritableComparable> keyClass,                           boolean createInstances) {                         this(keyClass, null, createInstances);                       }

                      //引數1   比較的物件型別的class  ScoreBean                       //引數2   配置檔案物件                       //引數3---是否建立WritableComparable  這個物件  預設false  代表不建立                       protected WritableComparator(Class<? extends WritableComparable> keyClass,                                                    Configuration conf,                                                    boolean createInstances) {                                          排序:分數                     分組:課程                     =========================                     computer    huangjiaju    83.2-----------(null)                     computer    huangjiaju    83.2-----------(null)                     computer    liutao    83.0-----------(null)                     =========================                     math    huangxiaoming    83.0-----------(null)                     =========================                     english    huanglei    83.0-----------(null)                     =========================                     computer    liutao    83.0-----------(null)                     先進行的排序  在進行的分組                     排完序:                     computer    huangjiaju    83.2-----------(null)                     computer    huangjiaju    83.2-----------(null)                     computer    liutao    83.0-----------(null)                     math    huangxiaoming    83.0-----------(null)                     english    huanglei    83.0-----------(null)                     computer    liutao    83.0-----------(null)                     分組在排序的基礎上進行的:判斷相鄰的                     computer    huangjiaju    83.2-----------(null)                     computer    huangjiaju    83.2-----------(null)                     computer    liutao    83.0-----------(null)                                                               math    huangxiaoming    83.0-----------(null)                                                               english    huanglei    83.0-----------(null)                                                               computer    liutao    83.0-----------(null)                                          排序的時候需要將分組的欄位放在一起                                          排序:                     課程   分數                     分組:                     課程                                                               既有排序 a b 又有分組  c                     排序:c a  b                     分組:c                     排序的欄位中必須包含 分組欄位  分組欄位必須在排序欄位的前面                                               

補充:     reduce中的迭代器:         1)只能迭代一次    迭代的過程中指標的操作             只要迭代一次  指標調到末尾         2)迭代器中的資料對應的一組的所有的value             這個迭代器中每一個value都對應一個自己的key                                       111111111111111111111111111111computer    huangzitao    72.42857142857143             computer    huangzitao    72.42857142857143-----------(null)             computer    huangxiaoming    72.42857142857143-----------(null)             computer    huangzitao    72.42857142857143-----------(null)             computer    huangxiaoming    72.42857142857143-----------(null)             ==============computer    huangxiaoming    72.42857142857143             每一組中的這個key是一直隨著value指標移動而移動的             每一個value都會對應一個key值