分組分割槽和資料傾斜combiner
reducetask的並行度----分割槽: yarchild-- 1maptask---MyMapper/1reducetask任務----MyReducer 執行reduce任務的並行度 目前情況下 reducetask只有一個 19557 YarnChild --- reducetask 預設情況下 reducetask的個數只有1個 一個reducetask只能執行在一個節點上 當資料量很大時候 只有一個reducetask不合理的 1)這個reducetask的壓力很大 2)負載不均衡 reducetask也應該根據我們的實際的資料 設定多個 如何設定: 程式碼設定: job.setNumReduceTasks(tasks); 引數代表就是reducetask的個數 需要幾個reducetask的時候 設定為幾就可以 這個引數值 預設為1 預設情況只執行一個reducetask job.setNumReduceTask; 發現 輸出的結果3個檔案 1個標誌檔案 part-r-00000 part-r-00001 part-r-00002 結論:一個reducetask最終輸出一個對應的結果檔案 reducetask中的資料如何劃分的: 預設情況下: hash分割資料的 key.hash%reducetask的個數 一個reducetask的資料對應的是一個分割槽的資料 分割槽:對map輸出的資料進行一個按照一定的規則劃分 每一部分稱為一個分割槽 partition 這裡的分割規則叫分割槽演算法 推斷:預設的分割槽演算法:hash演算法 1)雜湊 2)唯一 底層實現: 預設的抽象類 Partitioner抽象類 定義分割槽演算法/分割槽規則的 public abstract class Partitioner<KEY, VALUE> { //返回值:int 這裡的返回值代表的是分割槽編號 每一個分割槽的唯一標誌 預設從0開始 順序遞增的 //引數:引數1-map輸出的key 引數2-map輸出的value 引數3-分割槽個數(job.setNumReduceTask()) public abstract int getPartition (KEY key, VALUE value, int numPartitions); } 預設呼叫的實現類:HashPartitioner //泛型指的是map輸出的k v的型別 public class HashPartitioner<K, V> extends Partitioner<K, V> { //預設的引數3 numReduceTasks /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }
} key.hashCode() & Integer.MAX_VALUE 目的:防止溢位 範圍控制integer_max 1011001100111010101010101 & 111111111111111111111111 -------------------------- 011001100111010101010101 預設的分割槽演算法 取map的key的hash值%reducetask的個數 獲取返回值就是分割槽編號 總結:預設情況下 一個分割槽----一個reducetask----一個輸出結果檔案 part-r-00001 00001---代表分割槽編號 預設情況下 分割槽演算法 hash分割槽 (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 預設情況下 分割槽演算法 和 reducetask的個數一一對應的 reducetask的並行度 取決於reducetask的個數 job.setNumReduceTasks reducetask的資料如何分配 取決於分割槽演算法的 預設的hash分割槽 自定義分割槽: 預設的分割槽演算法 並不能滿足所有的需求 不能自定義資料的去向的 假設:淘寶資料 江蘇 浙江 上海 內蒙 按照地域進行劃分reducetask的資料 預設的就不可以 需要自定義分割槽 寫法: 1)繼承Partitioner類 2)重寫getPartition 方法 這個方法的返回值 代表就是分割槽編號 案例: 將流量彙總統計結果按照手機歸屬地不同省份輸出到不同檔案中 手機號的前三位---手機號歸屬地 輸出到不同的檔案-----不同的reducetask的輸出---不同分割槽 分割槽演算法:按照手機號的歸屬地進行劃分的 歸屬地不同---不同分割槽--不同reducetask---不同檔案 自定義分割槽: 分割槽欄位map的key的位置 按照手機號分割槽 shuffle map端: key:手機號 value:剩下的 reduce端: 輸出 //指定自定義分割槽 job.setPartitionerClass(MyPartitioner.class); //指定reducetask的個數 job.setNumReduceTasks(3); 報錯: 分割槽編號 值3 reducetask-3個 redutask0--分割槽0 reducetask1=分割槽1 reducetask2=分割槽2 報錯: Illegal partition for 15013685858 (3) 原因 分割槽和reducetask的個數不匹配 注意:設定分割槽編號的時候 最好順序遞增的 不要跳數 分割槽個數3 個 reducetask的個數可以設定: 1=====可以 對應一個輸出結果 2=====不可以 3可以 >3 可以 reducetask的個數 為1 的時候 所有的分割槽的資料 預設全部到一個reducetask中 reducetask的個數大於1 按照分割槽編號 對應的分割槽到對應的reducetask中 設定的時候reducetask的個數=分割槽個數 分割槽演算法: 預設分割槽:key.hash%reducetask的個數 自定義分割槽:自己定義的 分割槽演算法決定map輸出的資料 如何分配給reduce的 資料傾斜: 多個reducetask平行計算的時候 某一個reducetask分配資料不均勻 這個時候產生現象資料傾斜 後果:reducetask的整體執行的效能低 100reducetask 99reducetask--1T 10min 1reducetask-----100T 1000min 進度: map 100% reduce 99% map 100% reduce 99% map 100% reduce 99% map 100% reduce 99% map 100% reduce 99% map 100% reduce 99% map 100% reduce 99% map 100% reduce 99% ............... 大資料中不怕資料量大 怕資料傾斜 很快reducetask達到90% 卡 資料傾斜一定需要儘量避免的 原因:分割槽演算法沒有合理的分配均勻資料 解決:合理設計分割槽演算法 根據實際的資料抽樣做測試 進行合理設計分割槽 業務、資料 mapreduce階段: maptask: 1個切片預設128M---1maptask 1)maptask的並行度比較高 切片個數---資料塊的個數(最後一個切片有可能跨塊的) 2)maptask資料一個切片 128M 對應的資料量比較小 reducetask: 容易產生資料傾斜 1)並行度不高 job.setNumReduceTasks() 經驗值:reducetask最大值 datanode*0.95 100---95 2)reducetask的資料分配取決於分割槽演算法 3)reducetask的對應的資料量本身比較大 combiner元件: 這個元件不適用所有的場景的 優化元件:提升效能的作用 這個元件預設不加的 作用:對到reducetask之前的資料做預處理 幫助reducetask做預處理 減少reducetask的資料量 提升效能 預設沒有元件: 怎麼加? 這個元件就是幫助reducetask做事情 減輕redcetask的壓力 提前幫助reducetask做一些自己的工作 combiner業務邏輯和reducetask的邏輯一樣 作用的時間點在maptask之後 reducetask之前 怎麼實現? 這個元件不會影響業務邏輯 和reducetask的實現是一樣 1)繼承Reducer類 2)重寫reduce方法 3)job中設定新增combiner元件 job.setCombinerClass(MyCombiner.class); 實現 和reducetask的實現一樣 實際的開發過程直接使用reducer的類作為combiner的類 combiner的可以適用: 求和 求最大值 求最小值 Combiner不使用場景: 求平均值 注意:這個元件使用的時候 慎重考慮 分組: map----shuffle-----reduce shuffle:分割槽----排序----分組 分組到底是怎麼分組的:按照map的key進行分組的 預設的型別: wc: 000000000000000000000000000 hadoop----------------64 hadoop----------------64 hadoop----------------64 hadoop----------------64 000000000000000000000000000 hadoophello----------------1 hadoophello----------------1 hadoophello----------------1 hadoophello----------------1 相同的key為一組 預設的是key一直隨著value指標變化而變化的 只不過預設的key都是一樣的 自定義的型別: 111111111111111111111111111111 computer huangzitao 72.42857142857143-----------(null) computer huangxiaoming 72.42857142857143-----------(null) computer huangzitao 72.42857142857143-----------(null) computer huangxiaoming 72.42857142857143-----------(null) 預設的分組按照自定義型別的comparaTo的方法進行分組的 現根據科目 再根據 成績 將相同科目 和 相同分數的人劃分到一組 111111111111111111111111111111computer huangzitao 72.42857142857143 //這一組中的第一個key map:context.write(sb,null) computer huangzitao 72.42857142857143-----------(null) computer huangxiaoming 72.42857142857143-----------(null) computer huangzitao 72.42857142857143-----------(null) computer huangxiaoming 72.42857142857143-----------(null) reduce: 預設分組:呼叫的就是所有型別ComparaTo方法---排序的方法 有的需求中 分組和排序如果有衝突,怎麼辦? 案例: 3、求出每門課程參考學生平均成績top3的學生的資訊:課程,姓名和平均分
課程 姓名 成績 computer liujialing 98 computer huangbo 96 computer liutao 95 math huangdatou 99 math liuyifei 98 math liutao 95 map的輸出的key:自定義型別(課程 姓名 成績) 排序:按照成績 分組:課程 comparaTo(){ 先按課程 成績 } 實際的上的分組:comparaTo() 按照:課程+成績 每個課程的前三 保證reduce一次性可以接受的資料是同一門課程的 排序 和 分組 衝突了 分組不能使用剛才的排序的規則的了 排序:按照成績 分組:課程 自定義分組: 分組:課程 分組的底層 對map輸出所有的key進行比較 相同的(return 0)key 認為一組 比較的過程中 返回的!=0 認為不是同一組 writableComparator 分組規則的 普通類 預設的分組的比較: @SuppressWarnings("unchecked") //map輸出的key-----比較 序列化 WritableComparable //引數:WritableComparable //分組針對map輸出的key public int compare(WritableComparable a, WritableComparable b) { //預設的分組就是呼叫 自定義的comparaTo return a.compareTo(b); } 自定義分組:重寫compare 1)繼承writableComparator 2)重寫compare() 3)job中指定 job.setGroupingComparatorClass(MyGroup.class); 報錯: java.lang.Exception: java.lang.NullPointerException 空指標異常的錯誤: compare() 兩個引數的物件預設不會幫我們建立的 /** Construct for a {@link WritableComparable} implementation. */ protected WritableComparator(Class<? extends WritableComparable> keyClass) { this(keyClass, null, false); }
protected WritableComparator(Class<? extends WritableComparable> keyClass, boolean createInstances) { this(keyClass, null, createInstances); }
//引數1 比較的物件型別的class ScoreBean //引數2 配置檔案物件 //引數3---是否建立WritableComparable 這個物件 預設false 代表不建立 protected WritableComparator(Class<? extends WritableComparable> keyClass, Configuration conf, boolean createInstances) { 排序:分數 分組:課程 ========================= computer huangjiaju 83.2-----------(null) computer huangjiaju 83.2-----------(null) computer liutao 83.0-----------(null) ========================= math huangxiaoming 83.0-----------(null) ========================= english huanglei 83.0-----------(null) ========================= computer liutao 83.0-----------(null) 先進行的排序 在進行的分組 排完序: computer huangjiaju 83.2-----------(null) computer huangjiaju 83.2-----------(null) computer liutao 83.0-----------(null) math huangxiaoming 83.0-----------(null) english huanglei 83.0-----------(null) computer liutao 83.0-----------(null) 分組在排序的基礎上進行的:判斷相鄰的 computer huangjiaju 83.2-----------(null) computer huangjiaju 83.2-----------(null) computer liutao 83.0-----------(null) math huangxiaoming 83.0-----------(null) english huanglei 83.0-----------(null) computer liutao 83.0-----------(null) 排序的時候需要將分組的欄位放在一起 排序: 課程 分數 分組: 課程 既有排序 a b 又有分組 c 排序:c a b 分組:c 排序的欄位中必須包含 分組欄位 分組欄位必須在排序欄位的前面
補充: reduce中的迭代器: 1)只能迭代一次 迭代的過程中指標的操作 只要迭代一次 指標調到末尾 2)迭代器中的資料對應的一組的所有的value 這個迭代器中每一個value都對應一個自己的key 111111111111111111111111111111computer huangzitao 72.42857142857143 computer huangzitao 72.42857142857143-----------(null) computer huangxiaoming 72.42857142857143-----------(null) computer huangzitao 72.42857142857143-----------(null) computer huangxiaoming 72.42857142857143-----------(null) ==============computer huangxiaoming 72.42857142857143 每一組中的這個key是一直隨著value指標移動而移動的 每一個value都會對應一個key值