Hive優化
一、map階段優化
map端: spill(100M,80%)-->meger(壓縮)參數:io.sort.mb(default100)
當map task開始運算,並產生中間數據時,其產生的中間結果並非直接就簡單的寫入磁盤。
而是會利用到了內存buffer來進行已經產生的部分結果的緩存,
並在內存buffer中進行一些預排序來優化整個map的性能。
每一個map都會對應存在一個內存buffer,map會將已經產生的部分結果先寫入到該buffer中,
這個buffer默認是100MB大小,但是這個大小是可以根據job提交時的參數設定來調整的,
當map的產生數據非常大時,並且把io.sort.mb調大,
那麽map在整個計算過程中spill的次數就勢必會降低,
maptask對磁盤的操作就會變少,
如果map tasks的瓶頸在磁盤上,這樣調整就會大大提高map的計算性能。
參數:o.sort.spill.percent(default0.80,也就是80%)
map在運行過程中,不停的向該buffer中寫入已有的計算結果,
但是該buffer並不一定能將全部的map輸出緩存下來,
當map輸出超出一定閾值(比如100M),那麽map就必須將該buffer中的數據寫入到磁盤中去,
這個過程在mapreduce中叫做spill。
因為如果全部寫滿了再去寫spill,勢必會造成map的計算部分等待buffer釋放空間的情況。
所以,map其實是當buffer被寫滿到一定程度(比如80%)時,就開始進行spill。
這個閾值也是由一個job的配置參數來控制,
這個參數同樣也是影響spill頻繁程度,進而影響maptask運行周期對磁盤的讀寫頻率的。
但非特殊情況下,通常不需要人為的調整。調整io.sort.mb對用戶來說更加方便。
參數:io.sort.factor
當map task的計算部分全部完成後,如果map有輸出,就會生成一個或者多個spill文件,這些文件就是map的輸出結果。
map在正常退出之前,需要將這些spill合並(merge)成一個,所以map在結束之前還有一個merge的過程。
merge的過程中,有一個參數可以調整這個過程的行為,該參數為:io.sort.factor。
該參數默認為10。它表示當merge spill文件時,最多能有多少並行的stream向merge文件中寫入。
比如如果map產生的數據非常的大,產生的spill文件大於10,而io.sort.factor使用的是默認的10,
那麽當map計算完成做merge時,就沒有辦法一次將所有的spill文件merge成一個,而是會分多次,每次最多10個stream。
有利於減少merge次數,進而減少map對磁盤的讀寫頻率,有可能達到優化作業的目的。
參數:min.num.spill.for.combine(default3)
當job指定了combiner的時候,我們都知道map介紹後會在map端根據combiner定義的函數將map結果進行合並。
運行combiner函數的時機有可能會是merge完成之前,或者之後,這個時機可以由一個參數控制,
即min.num.spill.for.combine(default3),當job中設定了combiner,並且spill數最少有3個的時候,
那麽combiner函數就會在merge產生結果文件之前運行。
通過這樣的方式,就可以在spill非常多需要merge,並且很多數據需要做conbine的時候,
減少寫入到磁盤文件的數據數量,同樣是為了減少對磁盤的讀寫頻率,有可能達到優化作業的目的。
參數:mapred.compress.map.output(defaultfalse)
減少中間結果讀寫進出磁盤的方法不止這些,還有就是壓縮。
也就是說map的中間,無論是spill的時候,還是最後merge產生的結果文件,都是可以壓縮的。
壓縮的好處在於,通過壓縮減少寫入讀出磁盤的數據量。
對中間結果非常大,磁盤速度成為map執行瓶頸的job,尤其有用。
控制map中間結果是否使用壓縮的參數為:mapred.compress.map.output(true/false)。
將這個參數設置為true時,那麽map在寫中間結果時,就會將數據壓縮後再寫入磁盤,讀結果時也會采用先解壓後讀取數據。
這樣做的後果就是:寫入磁盤的中間結果數據量會變少,但是cpu會消耗一些用來壓縮和解壓。
所以這種方式通常適合job中間結果非常大,瓶頸不在cpu,而是在磁盤的讀寫的情況。
說的直白一些就是用cpu換IO。
根據觀察,通常大部分的作業cpu都不是瓶頸,除非運算邏輯異常復雜。所以對中間結果采用壓縮通常來說是有收益的。
參數:mapred.map.output.compression.codec(default org.apache.hadoop.io.compress.DefaultCodec)
當采用map中間結果壓縮的情況下,用戶還可以選擇壓縮時采用哪種壓縮格式進行壓縮,
現在hadoop支持的壓縮格式有:GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式。
通常來說,想要達到比較平衡的cpu和磁盤壓縮比,LzoCodec比較適合。但也要取決於job的具體情況。
用戶若想要自行選擇中間結果的壓縮算法,
可以設置配置參數:mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec或者其他用戶自行選擇的壓縮方式
二、reduce階段優化
reduce的運行是分成三個階段的。分別為copy->sort->reduce。
由於job的每一個map都會根據reduce(n)數將數據分成map輸出結果分成n個partition,
所以map的中間結果中是有可能包含每一個reduce需要處理的部分數據的。
所以,為了優化reduce的執行時間,hadoop中是等job的第一個map結束後,
所有的reduce就開始嘗試從完成的map中下載該reduce對應的partition部分數據。
這個過程就是通常所說的shuffle,也就是copy過程。
參數:mapred.reduce.parallel.copies(default5)
說明:每個reduce並行下載map結果的最大線程數。
Reduce task在做shuffle時,實際上就是從不同的已經完成的map上去下載屬於自己這個reduce的部分數據,
由於map通常有許多個,所以對一個reduce來說,下載也可以是並行的從多個map下載,這個並行度是可以調整的,
調整參數為:mapred.reduce.parallel.copies(default5)。
默認情況下,每個只會有5個並行的下載線程在從map下數據,如果一個時間段內job完成的map有100個或者更多,
那麽reduce也最多只能同時下載5個map的數據,
所以這個參數比較適合map很多並且完成的比較快的job的情況下調大,有利於reduce更快的獲取屬於自己部分的數據。
參數:mapred.reduce.copy.backoff(default300秒)
說明:reduce下載線程最大等待時間(秒)
reduce的每一個下載線程在下載某個map數據的時候,有可能因為那個map中間結果所在機器發生錯誤,
或者中間結果的文件丟失,或者網絡瞬斷等等情況,這樣reduce的下載就有可能失敗,
所以reduce的下載線程並不會無休止的等待下去,當一定時間後下載仍然失敗,那麽下載線程就會放棄這次下載,
並在隨後嘗試從另外的地方下載(因為這段時間map可能重跑)。
所以reduce下載線程的這個最大的下載時間段是可以調整的,
調整參數為:mapred.reduce.copy.backoff(default300秒)。
如果集群環境的網絡本身是瓶頸,那麽用戶可以通過調大這個參數來避免reduce下載線程被誤判為失敗的情況。
不過在網絡環境比較好的情況下,沒有必要調整。通常來說專業的集群網絡不應該有太大問題,所以這個參數需要調整的情況不多。
參數:io.sort.factor
Reduce將map結果下載到本地時,同樣也是需要進行merge的,所以io.sort.factor的配置選項同樣會影響reduce進行merge時的行為,
該參數的詳細介紹上文已經提到,當發現reduce在shuffle階段iowait非常的高的時候,就有可能通過調大這個參數來加大一次merge時的並發吞吐,優化reduce效率。
參數:mapred.job.shuffle.input.buffer.percent(default0.7)
說明:用來緩存shuffle數據的reducetask heap百分比
Reduce在shuffle階段對下載來的map數據,並不是立刻就寫入磁盤的,而是會先緩存在內存中,然後當使用內存達到一定量的時候才刷入磁盤。
這個內存大小的控制就不像map一樣可以通過io.sort.mb來設定了,而是通過另外一個參數來設置:mapred.job.shuffle.input.buffer.percent(default0.7),
這個參數其實是一個百分比,意思是說,shuffile在reduce內存中的數據最多使用內存量為:0.7× maxHeap of reduce task。
也就是說,如果該reducetask的最大heap使用量(通常通過mapred.child.java.opts來設置,比如設置為-Xmx1024m)的一定比例用來緩存數據。
默認情況下,reduce會使用其heapsize的70%來在內存中緩存數據。
如果reduce的heap由於業務原因調整的比較大,相應的緩存大小也會變大,這也是為什麽reduce用來做緩存的參數是一個百分比,而不是一個固定的值了。
參數:mapred.job.shuffle.merge.percent(default0.66)
說明:緩存的內存中多少百分比後開始做merge操作
假設mapred.job.shuffle.input.buffer.percent為0.7,reducetask的max heapsize為1G,
那麽用來做下載數據緩存的內存就為大概700MB左右,這700M的內存,跟map端一樣,
也不是要等到全部寫滿才會往磁盤刷的,而是當這700M中被使用到了一定的限度(通常是一個百分比),就會開始往磁盤刷。
這個限度閾值也是可以通過job參數來設定的,設定參數為:mapred.job.shuffle.merge.percent(default0.66)。
如果下載速度很快,很容易就把內存緩存撐大,那麽調整一下這個參數有可能會對reduce的性能有所幫助。
參數:mapred.job.reduce.input.buffer.percent(default0.0)
說明:sort完成後reduce計算階段用來緩解數據的百分比
當reduce將所有的map上對應自己partition的數據下載完成後,就會開始真正的reduce計算階段
(中間有個sort階段通常時間非常短,幾秒鐘就完成了,因為整個下載階段就已經是邊下載邊sort,然後邊merge的)。
當reducetask真正進入reduce函數的計算階段的時候,有一個參數也是可以調整reduce的計算行為。
也就是:mapred.job.reduce.input.buffer.percent(default0.0)。
由於reduce計算時肯定也是需要消耗內存的,而在讀取reduce需要的數據時,同樣是需要內存作為buffer,
這個參數是控制,需要多少的內存百分比來作為reduce讀已經sort好的數據的buffer百分比。
默認情況下為0,也就是說,默認情況下,reduce是全部從磁盤開始讀處理數據。
如果這個參數大於0,那麽就會有一定量的數據被緩存在內存並輸送給reduce,
當reduce計算邏輯消耗內存很小時,可以分一部分內存用來緩存數據,反正reduce的內存閑著也是閑著。
mapred.reduce.slowstart.completed.maps(map完成多少百分比時,開始shuffle)
當map運行慢,reduce運行很快時,如果不設置mapred.reduce.slowstart.completed.maps會使job的shuffle時間變的很長,
map運行完很早就開始了reduce,導致reduce的slot一直處於被占用狀態。mapred.reduce.slowstart.completed.maps這個值是
和“運行完的map數除以總map數”做判斷的,當後者大於等於設定的值時,開始reduce的shuffle。所以當map比reduce的執行
時間多很多時,可以調整這個值(0.75,0.80,0.85及以上)
下面從流程裏描述一下各個參數的作用:
當maptask開始運算,並產生中間數據時,其產生的中間結果並非直接就簡單的寫入磁盤。這中間的過程比較復雜,並且利用到了
內存buffer來進行已經產生的部分結果的緩存,並在內存buffer中進行一些預排序來優化整個map的性能。每一個map都會對應存
在一個內存buffer(MapOutputBuffer),map會將已經產生的部分結果先寫入到該buffer中,這個buffer默認是100MB大小,但
是這個大小是可以根據job提交時的參數設定來調整的,該參數即為:io.sort.mb。當map的產生數據非常大時,並且把io.sort.mb
調大,那麽map在整個計算過程中spill的次數就勢必會降低,maptask對磁盤的操作就會變少,如果map tasks的瓶頸在磁盤上,
這樣調整就會大大提高map的計算性能。
map在運行過程中,不停的向該buffer中寫入已有的計算結果,但是該buffer並不一定能將全部的map輸出緩存下來,當map輸出
超出一定閾值(比如100M),那麽map就必須將該buffer中的數據寫入到磁盤中去,這個過程在mapreduce中叫做spill。map並
不是要等到將該buffer全部寫滿時才進行spill,因為如果全部寫滿了再去寫spill,勢必會造成map的計算部分等待buffer釋放空間的
情況。所以,map其實是當buffer被寫滿到一定程度(比如80%)時,就開始進行spill。這個閾值也是由一個job的配置參數來控
制,即io.sort.spill.percent,默認為0.80或80%。這個參數同樣也是影響spill頻繁程度,進而影響maptask運行周期對磁盤的讀寫
頻率的。但非特殊情況下,通常不需要人為的調整。調整io.sort.mb對用戶來說更加方便。
當maptask的計算部分全部完成後,如果map有輸出,就會生成一個或者多個spill文件,這些文件就是map的輸出結果。map在正
常退出之前,需要將這些spill合並(merge)成一個,所以map在結束之前還有一個merge的過程。merge的過程中,有一個參數
可以調整這個過程的行為,該參數為:io.sort.factor。該參數默認為10。它表示當mergespill文件時,最多能有多少並行的stream
向merge文件中寫入。比如如果map產生的數據非常的大,產生的spill文件大於10,而io.sort.factor使用的是默認的10,那麽當
map計算完成做merge時,就沒有辦法一次將所有的spill文件merge成一個,而是會分多次,每次最多10個stream。這也就是說,
當map的中間結果非常大,調大io.sort.factor,有利於減少merge次數,進而減少map對磁盤的讀寫頻率,有可能達到優化作業的
目的。
當job指定了combiner的時候,我們都知道map介紹後會在map端根據combiner定義的函數將map結果進行合並。運行combiner
函數的時機有可能會是merge完成之前,或者之後,這個時機可以由一個參數控制,即min.num.spill.for.combine(default3),
當job中設定了combiner,並且spill數最少有3個的時候,那麽combiner函數就會在merge產生結果文件之前運行。通過這樣的方
式,就可以在spill非常多需要merge,並且很多數據需要做conbine的時候,減少寫入到磁盤文件的數據數量,同樣是為了減少對磁
盤的讀寫頻率,有可能達到優化作業的目的。
減少中間結果讀寫進出磁盤的方法不止這些,還有就是壓縮。也就是說map的中間,無論是spill的時候,還是最後merge產生的結
果文件,都是可以壓縮的。壓縮的好處在於,通過壓縮減少寫入讀出磁盤的數據量。對中間結果非常大,磁盤速度成為map執行瓶
頸的job,尤其有用。控制map中間結果是否使用壓縮的參數為:mapred.compress.map.output(true/false)。將這個參數設置為
true時,那麽map在寫中間結果時,就會將數據壓縮後再寫入磁盤,讀結果時也會采用先解壓後讀取數據。這樣做的後果就是:寫
入磁盤的中間結果數據量會變少,但是cpu會消耗一些用來壓縮和解壓。所以這種方式通常適合job中間結果非常大,瓶頸不在
cpu,而是在磁盤的讀寫的情況。說的直白一些就是用cpu換IO。根據觀察,通常大部分的作業cpu都不是瓶頸,除非運算邏輯異常
復雜。所以對中間結果采用壓縮通常來說是有收益的。
當采用map中間結果壓縮的情況下,用戶還可以選擇壓縮時采用哪種壓縮格式進行壓縮,現在hadoop支持的壓縮格式有:
GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等壓縮格式。通常來說,想要達到比較平衡的cpu和磁盤壓縮比,LzoCodec
比較適合。但也要取決於job的具體情況。用戶若想要自行選擇中間結果的壓縮算法,可以設置配置參數:
mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec或者其他用戶自行選擇的壓縮方式。
Hive優化