1. 程式人生 > >MapReduce優化_1

MapReduce優化_1

**

1、配置調優

**
調優總的原則給shuffle過程儘量多提供記憶體空間,在map端,可以通過避免多次溢位寫磁碟來獲得最佳效能(相關配置io.sort.*,io.sort.mb),在reduce端,中間資料全部駐留在記憶體時,就能獲得最佳效能,但是預設情況下,這是不可能發生的,因為一般情況所有記憶體都預留給reduce含函式(如需修改 需要配置mapred.inmem.merge.threshold,mapred.job.reduce.input.buffer.percent)

如果能夠根據情況對shuffle過程進行調優,對於提供MapReduce效能很有幫助。
一個通用的原則是給shuffle過程分配儘可能大的記憶體,當然你需要確保map和reduce有足夠的記憶體來執行業務邏輯。因此在實現Mapper和Reducer時,應該儘量減少記憶體的使用,例如避免在Map中不斷地疊加。
執行map和reduce任務的JVM,記憶體通過mapred.child.java.opts屬性來設定,儘可能設大記憶體。容器的記憶體大小通過mapreduce.map.memory.mb和mapreduce.reduce.memory.mb來設定,預設都是1024M。

可以通過以下方法提高排序和快取寫入磁碟的效率:

調整mapreduce.task.io.sort.mb大小,從而避免或減少快取溢位的數量。當調整這個引數時,最好同時檢測Map任務的JVM的堆大小,並必要的時候增加堆空間。
mapreduce.task.io.sort.factor屬性的值提高100倍左右,這可以使合併處理更快,並減少磁碟的訪問。
為K-V提供一個更高效的自定義序列化工具,序列化後的資料佔用空間越少,快取使用率就越高。
提供更高效的Combiner(合併器),使Map任務的輸出結果聚合效率更高。
提供更高效的鍵比較器和值的分組比較器。
輸出依賴於作業中Reduce任務的數量,下面是一些優化建議:

壓縮輸出,以節省儲存空間,同時也提升HDFS寫入吞吐量
避免寫入帶外端檔案(out-of-band side file)作為Reduce任務的輸出
根據作業輸出檔案的消費者的需求,可以分割的壓縮技術或許適合
以較大塊容量設定,寫入較大的HDFS檔案,有助於減少Map任務數
**

2、Map/Reduce端調優

**
通用優化
Hadoop預設使用4KB作為緩衝,這個算是很小的,可以通過io.file.buffer.size來調高緩衝池大小。

map端優化
避免寫入多個spill檔案可能達到最好的效能,一個spill檔案是最好的。通過估計map的輸出大小,設定合理的mapreduce.task.io.sort.*屬性,使得spill檔案數量最小。例如儘可能調大mapreduce.task.io.sort.mb。
在這裡插入圖片描述


reduce端優化
如果能夠讓所有資料都儲存在記憶體中,可以達到最佳的效能。通常情況下,記憶體都保留給reduce函式,但是如果reduce函式對記憶體需求不是很高,將mapreduce.reduce.merge.inmem.threshold(觸發合併的map輸出檔案數)設為0,mapreduce.reduce.input.buffer.percent(用於儲存map輸出檔案的堆記憶體比例)設為1.0,可以達到很好的效能提升。在TB級別資料排序效能測試中,Hadoop就是通過將reduce的中間資料都儲存在記憶體中勝利的。
在這裡插入圖片描述
在這裡插入圖片描述

**

3、Combiner與Partitioner

**
Combiner和Partitioner是用來優化MapReduce的。可以提高MapReduce的執行效率。

Combiner

叢集上的可用頻寬限制了MapReduce作業的數量,因此儘量避免map和reduce任務之間的資料傳輸是有利的。Hadoop允許使用者針對map任務的輸出指定一個combiner(就像mapper,reducer)。combiner函式的輸出作為reduce函式的輸入。由於combiner術語優化方案,所以Hadoop無法確定對map任務輸出記錄呼叫多少次combiner(如果需要)。換言之,不管呼叫多次combiner,reducer的輸出結果都是一樣的。

首先通過下面的示意圖直觀的瞭解一下Combiner的位置和作用。
從下圖可以看出,Combiner介於 Mapper和Reducer之間,combine作為 Map任務的一部分,執行完 map 函式後緊接著執行combine,而reduce 必須在所有的 Map 任務完成後才能進行。 而且還可以看出combine的過程與reduce的過程類似,都是對相同的單詞key合併其詞頻,很多情況下可以直接使用reduce函式來完成Combiner過程。
在這裡插入圖片描述
Combiner解析:

  1. ombiner可以看做區域性的Reducer(local reducer)。
  2. Combiner作用是合併相同的key對應的value。
  3. 在Mapper階段,不管Combiner被呼叫多少次,都不應改變 Reduce的輸出結果。
  4. Combiner通常與Reducer的邏輯是一樣的,一般情況下不需要單獨編寫Combiner,直接使用Reducer的實現就可以了。
  5. Combiner在Job中是如下設定的。 job.setCombinerClass(Reducer.class);//Combiner一般情況下,預設使用Reducer的實現

Combiner的優點

能夠減少Map Task輸出的資料量(即磁碟IO)。對spill,merge檔案都可以進行壓縮。
中間結果非常大導致IO成為瓶頸時壓縮非常有用,可以通過mapreduce.map.output.compress(default:false)設定為true進行壓縮,資料會被壓縮寫入磁碟,讀資料讀的是壓縮資料需要解壓,在實際經驗中Hive在Hadoop的執行的瓶頸一般都是IO而不是CPU,壓縮一般可以10倍的減少IO操作,壓縮的方式Gzip,Lzo,BZip2,Lzma等,其中Lzo是一種比較平衡選擇,mapreduce.map.output.compress.codec(default:org.apache.hadoop.io.compress.DefaultCodec)引數設定。但這個過程會消耗CPU,適合IO瓶頸比較大。
能夠減少Reduce-Map網路傳輸的資料量(網路IO)。Map Task 輸出越少,Reduce從Map結果中拉取的資料量就越少,自然就減少了網路傳輸的資料量。
Combiner的使用場景

並不是所有的場景都可以使用Combiner,必須滿足結果可以累加。
適合於Sum()求和,並不適合Average()求平均數。
例如,求0、20、10、25和15的平均數,直接使用Reduce求平均數Average(0,20,10,25,15),得到的結果是14, 如果先使用Combiner分別對不同Mapper結果求平均數,Average(0,20,10)=10,Average(25,15)=20,再使用Reducer求平均數Average(10,20),得到的結果為15,很明顯求平均數並不適合使用Combiner。
**

Partitioner

**
Partitioner 處於 Mapper階段,當Mapper處理好資料後,這些資料需要經過Partitioner進行分割槽,來選擇不同的Reducer處理,從而將Mapper的輸出結果均勻的分佈在Reducer上面執行。

對於map輸出的每一個鍵值對,系統都會給定一個partition,partition值預設通過計算key的hash值後對Reduce task的數量取模獲得。如果一個鍵值對的partition值為1,意味著這個鍵值對會交給第一個Reducer處理。
在這裡插入圖片描述

Partitioner解析:

Partitioner決定了Map Task 輸出的每條資料交給哪個Reduce Task 來處理。Partitioner 有兩個功能:
1) 均衡負載。它儘量將工作均勻地分配給不同的 Reduce。
2)效率。它的分配速度一定要非常快。

Partitioner 的預設實現:hash(key) mod R,這裡的R代表Reduce Task 的數目,意思就是對key進行hash處理然後取模。很多情況下,使用者需要自定義 Partitioner,比如“hash(hostname(URL)) mod R”,它確保相同域名下的網頁交給同一個 Reduce Task 來處理。 使用者自定義Partitioner,需要繼承Partitioner類,實現它提供的一個方法。

public class WordCountPartitioner extends Partitioner<Text, IntWritable> {
    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {
        // TODO Auto-generated method stub
        int a = key.hashCode()%numPartitions;
        if(a>=0)
            return a;
        else 
            return 0;
    }
}

**

自定義partitioner

**

每一個Reduce的輸出都是有序的,但是將所有Reduce的輸出合併到一起卻並非是全域性有序的,如果要做到全域性有序,我們該怎麼做呢?最簡單的方式,只設置一個Reduce task,但是這樣完全發揮不出叢集的優勢,而且能應對的資料量也很受限。最佳的方式是自己定義一個Partitioner,用輸入資料的最大值除以系統Reduce task數量的商作為分割邊界,也就是說分割資料的邊界為此商的1倍、2倍至numPartitions-1倍,這樣就能保證執行partition後的資料是整體有序的。

解決資料傾斜:另一種需要我們自己定義一個Partitioner的情況是各個Reduce task處理的鍵值對數量極不平衡。對於某些資料集,由於很多不同的key的hash值都一樣,導致這些鍵值對都被分給同一個Reducer處理,而其他的Reducer處理的鍵值對很少,從而拖延整個任務的進度。當然,編寫自己的Partitioner必須要保證具有相同key值的鍵值對分發到同一個Reducer。

自定義的Key包含了好幾個欄位,比如自定義key是一個物件,包括type1,type2,type3,只需要根據type1去分發資料,其他欄位用作二次排序。