1. 程式人生 > 其它 >hive中控制map和reduce數量的簡單實現方法

hive中控制map和reduce數量的簡單實現方法

技術標籤:hivehivehdfs大資料hadoopspark

hive中控制map和reduce數量的簡單實現方法

轉載:http://www.edianedi.com/index.php/archives/65/

做個記錄

先說結論:

  由於mapreduce中沒有辦法直接控制map數量,所以只能曲線救國,通過設定每個map中處理的資料量進行設定;reduce是可以直接設定的。
控制map和reduce的引數

set mapred.max.split.size=256000000;        -- 決定每個map處理的最大的檔案大小,單位為B
set mapred.min.split.size.per.node=1;         -- 節點中可以處理的最小的檔案大小
set mapred.min.split.size.per.rack=1;         -- 機架中可以處理的最小的檔案大小
方法1
set mapred.reduce.tasks=10;  -- 設定reduce的數量
方法2
set hive.exec.reducers.bytes.per.reducer=1073741824 -- 每個reduce處理的資料量,預設1GB

補充說明:一個叢集可以有多個機架,一個機架有1至多個節點,這裡的叢集是mapreduce不是yarn,yarn沒有詳細瞭解過,另外如果想要實現map中的資料合併需要設定下面的引數,叢集預設就是這個格式

set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

需要確認的問題:
 a.我們該設定多少個map多少個reduce才合適?
  map數普遍是通過執行時長來確認的,至少應當保證每個map執行時長在1分鐘以上,太短的話意味著大量重複的jvm啟用和銷燬。具體設定要根據具體任務來處理,有些任務佔用cpu大,有些佔用io大。

  我這邊的話因為大任務經常搞了上千個map,作為小叢集影響還是蠻大的,所以只對監控到的hql產生過大的map和reduce進行調整,經過一些簡單測試,map數保證在三四百個其實不影響執行效率。

 b.設定了上面的引數會帶來什麼影響?
  設定map的話,影響不是很大,可能會帶來更多的叢集之間的io,畢竟要做節點之間的檔案合併
  設定reduce的話,如果使用mapred.reduce.tasks,這個影響就大了,至少會造成同一個session每一個mr的job的reduce都是這麼多個,而且reduce個數意味著最後的檔案數量的輸出,如果小檔案太多的話可以開啟reduce端的小檔案合併引數,set hive.merge.mapredfiles=true

1、控制map數量的三個引數的邏輯概念

  可以簡單的理解為叢集對一個表分割槽下面的檔案進行分發到各個節點,之後根據mapred.max.split.size確認要啟動多少個map數,邏輯如下
  a.假設有兩個檔案大小分別為(256M,280M)被分配到節點A,那麼會啟動兩個map,剩餘的檔案大小為10MB和35MB因為每個大小都不足241MB會先做保留
  b.根據引數set mapred.min.split.size.per.node看剩餘的大小情況並進行合併,如果值為1,表示a中每個剩餘檔案都會自己起一個map,這裡會起兩個,如果設定為大於4510241024則會合併成一個塊,併產生一個map
  如果mapred.min.split.size.per.node為1010241024,那麼在這個節點上一共會有4個map,處理的大小為(245MB,245MB,10MB,10MB,10MB,10MB),餘下9MB
  如果mapred.min.split.size.per.node為4510241024,那麼會有三個map,處理的大小為(245MB,245MB,45MB)
  實際中mapred.min.split.size.per.node無法準確地設定成4510241024,會有剩餘並保留帶下一步進行判斷處理
  c. 對b中餘出來的檔案與其它節點餘出來的檔案根據mapred.min.split.size.per.rack大小進行判斷是否合併,對再次餘出來的檔案獨自產生一個map處理
2、控制map數量的簡單實用方式

  我們執行一個hive語句,發現起了1000個map,這種情況對於當前的資料量來說是完全不必要的,同時還會影響其它使用者提交任務
  這個時候希望map減小到250個左右,很簡單
  將map處理的最大的檔案大小增大,256000000*4=1024000000
引數修改為如下

set mapred.max.split.size=1024000000;
set mapred.min.split.size.per.node=1024000000;
set mapred.min.split.size.per.rack=1024000000;

3、控制reduce引數

  修改reduce的個數就簡單很多,直接根據可能的情況作個簡單的判斷確認需要的reduce數量,如果無法判斷,根據當前map數減小10倍,保持在1~100個reduce即可(注意,這個不同的叢集需求不同哈)
設定引數如下
set mapred.reduce.tasks=10
不建議隨意設定reduce引數哈,可能調整引數更好一點
set hive.exec.reducers.bytes.per.reducer=1073741824
4、控制map和rduce數的說明

  1、因為set引數的設定是session級別的,Toad for Cloud(青蛙)第三方軟體中暫時沒有發現如何使set的引數設定有效的,所以請使用CRT等工具連線到linux的介面中進行使用,使用hive命令連線hive叢集,一次設定只要不結束這次連線,那麼引數是始終有效的。
  2、注意各引數的設定大小,不要衝突,否則會異常,大小順序如下
mapred.max.split.size <= mapred.min.split.size.per.node <= mapred.min.split.size.per.rack
  3、這種減少map數的行為是否能帶來更短的執行時間,需要具體分析,map數也不是越少越好,減少了map數,單個map處理的資料量就上升,需要更多的時間,同時也因為需要合併節點內、節點間甚至機架之間的資料需要更多的IO和頻寬。
  引數的設定本質是根據檔案情況、系統情況、資料計算情況進行的一個平衡考慮,有取有舍,我們需要遵循的規則就是:使大資料量利用合適的map數;使單個map任務處理合適的資料量
  4、reduce數同3


1、hive.merge.mapfiles,True時會合並map輸出。
2、hive.merge.mapredfiles,True時會合並reduce輸出。
3、hive.merge.size.per.task,合併操作後的單個檔案大小。
4、hive.merge.size.smallfiles.avgsize,當輸出檔案平均大小小於設定值時,啟動合併操作。這一設定只有當hive.merge.mapfiles或hive.merge.mapredfiles設定為true時,才會對相應的操作有效。
5、mapred.reduce.tasks=30; 設定Reduce Task個數
6、hive.exec.compress.output=’false’; 設定資料不作壓縮,要是壓縮了我們拿出來的檔案就只能通過HIVE-JDBC來解析
7、mapred.map.tasks=1200;
8、hive.optimize.skewjoin=true;這個是給join優化的 0.6官方版本好像有個bug悲哀啊
9、hive.groupby.skewindata=true;這個是給groupby優化的

優化案例一:

使用的生產Hive環境的幾個引數配置如下:

        dfs.block.size=268435456

        hive.merge.mapredfiles=true

        hive.merge.mapfiles=true

        hive.merge.size.per.task=256000000

        mapred.map.tasks=2 

因為合併小檔案預設為true,而dfs.block.size與hive.merge.size.per.task的搭配使得合併後的絕大部分檔案都在300MB左右。

CASE 1:

    現在我們假設有3個300MB大小的檔案,那麼goalsize = min(900MB/2,256MB) = 256MB (具體如何計算map數請參見http://blog.sina.com.cn/s/blog_6ff05a2c010178qd.html)

    所以整個JOB會有6個map,其中3個map分別處理256MB的資料,還有3個map分別處理44MB的資料。

    這時候木桶效應就來了,整個JOB的map階段的執行時間不是看最短的1個map的執行時間,而是看最長的1個map的執行時間。所以,雖然有3個map分別只處理44MB的資料,可以很快跑完,但它們還是要等待另外3個處理256MB的map。顯然,處理256MB的3個map拖了整個JOB的後腿。

CASE 2:

    如果我們把mapred.map.tasks設定成6,再來看一下有什麼變化:

    goalsize = min(900MB/6,256MB) = 150MB

    整個JOB同樣會分配6個map來處理,每個map處理150MB的資料,非常均勻,誰都不會拖後腿,最合理地分配了資源,執行時間大約為CASE 1的59%(150/256) 

    案例分析:

    雖然mapred.map.tasks從2調整到了6,但是CASE 2並沒有比CASE 1多用map資源,同樣都是使用6個map。而CASE 2的執行時間約為CASE 1執行時間的59%。

從這個案例可以看出,對mapred.map.tasks進行自動化的優化設定其實是可以很明顯地提高作業執行效率的。

案例二(處理小檔案):

最近倉庫裡面新建了一張分割槽表,資料量大約是12億行,分割槽比較多,從2008年7月開始 一天一個分割槽。

配置了一個任務

對這個表進行group by 的時候 發現啟動了2800多個maps .

執行的時間也高大10分鐘。

然後我在hdfs檔案裡面看到 這個表的每個分割槽裡面都有20多個小檔案,每個檔案都不大 300KB--1MB

 

之前的hive的引數:

hive.merge.mapfiles=true

hive.merge.mapredfiles=false

hive.merge.rcfile.block.level=true

hive.merge.size.per.task=256000000

hive.merge.smallfiles.avgsize=16000000

hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

mapred.max.split.size=256000000

mapred.min.split.size=1

mapred.min.split.size.per.node=1

mapred.min.split.size.per.rack=1

 

hive.merge.mapredfiles 這個指的是 在Map-Reduce的任務結束時合併小檔案

解決辦法:

1.修改引數hive.merge.mapredfiles=true

2.通過map_reduece的辦法生成一張新的表 此時生成的檔案變成了每個分割槽一個檔案

 

再次執行group by 發現效率得到了大大的提升。

小結:

正確處理hive小檔案 是 控制map數的一個重要環節

處理的不好 會大大影響任務的執行效率