1. 程式人生 > >Hive on Spark調優

Hive on Spark調優

之前在Hive on SparkTPCx-BB測試時,100g的資料量要跑十幾個小時,一看CPU和記憶體的監控,發現 POWER_TEST階段(依次執行30個查詢)CPU只用了百分之十幾,也就是沒有把整個叢集的效能利用起來,導致跑得很慢。因此,如何調整引數,使整個叢集發揮最大效能顯得尤為重要。

Spark作業執行原理

spark-base-mech.jpg

詳細原理見上圖。我們使用spark-submit提交一個Spark作業之後,這個作業就會啟動一個對應的Driver程序。根據你使用的部署模式(deploy-mode)不同,Driver程序可能在本地啟動,也可能在叢集中某個工作節點上啟動。Driver程序本身會根據我們設定的引數,佔有一定數量的記憶體和CPU core。而Driver程序要做的第一件事情,就是向叢集管理器(可以是Spark Standalone叢集,也可以是其他的資源管理叢集,美團•大眾點評使用的是YARN作為資源管理叢集)申請執行Spark作業需要使用的資源,這裡的資源指的就是Executor程序。YARN叢集管理器會根據我們為Spark作業設定的資源引數,在各個工作節點上,啟動一定數量的Executor程序,每個Executor程序都佔有一定數量的記憶體和CPU core。

Spark是根據shuffle類運算元來進行stage的劃分。如果我們的程式碼中執行了某個shuffle類運算元(比如reduceByKey、join等),那麼就會在該運算元處,劃分出一個stage界限來。可以大致理解為,shuffle運算元執行之前的程式碼會被劃分為一個stage,shuffle運算元執行以及之後的程式碼會被劃分為下一個stage。因此一個stage剛開始執行的時候,它的每個task可能都會從上一個stage的task所在的節點,去通過網路傳輸拉取需要自己處理的所有key,然後對拉取到的所有相同的key使用我們自己編寫的運算元函式執行聚合操作(比如reduceByKey()運算元接收的函式)。這個過程就是shuffle。

task的執行速度是跟每個Executor程序的CPU core數量有直接關係的。一個CPU core同一時間只能執行一個執行緒。而每個Executor程序上分配到的多個task,都是以每個task一條執行緒的方式,多執行緒併發執行的。如果CPU core數量比較充足,而且分配到的task數量比較合理,那麼通常來說,可以比較快速和高效地執行完這些task執行緒。

以上就是Spark作業的基本執行原理的說明,大家可以結合上圖來理解。理解作業基本原理,是我們進行資源引數調優的基本前提。

引數調優

瞭解完了Spark作業執行的基本原理之後,對資源相關的引數就容易理解了。所謂的Spark資源引數調優,其實主要就是對Spark執行過程中各個使用資源的地方,通過調節各種引數,來優化資源使用的效率,從而提升Spark作業的執行效能。以下引數就是Spark中主要的資源引數,每個引數都對應著作業執行原理中的某個部分。

num-executors/spark.executor.instances

  • 引數說明:該引數用於設定Spark作業總共要用多少個Executor程序來執行。Driver在向YARN叢集管理器申請資源時,YARN叢集管理器會盡可能按照你的設定來在叢集的各個工作節點上,啟動相應數量的Executor程序。這個引數非常之重要,如果不設定的話,預設只會給你啟動少量的Executor程序,此時你的Spark作業的執行速度是非常慢的。

  • 引數調優建議:每個Spark作業的執行一般設定50~100個左右的Executor程序比較合適,設定太少或太多的Executor程序都不好。設定的太少,無法充分利用叢集資源;設定的太多的話,大部分佇列可能無法給予充分的資源。

executor-memory/spark.executor.memory

  • 引數說明:該引數用於設定每個Executor程序的記憶體。Executor記憶體的大小,很多時候直接決定了Spark作業的效能,而且跟常見的JVM OOM異常,也有直接的關聯。

  • 引數調優建議:每個Executor程序的記憶體設定4G~8G較為合適。但是這只是一個參考值,具體的設定還是得根據不同部門的資源佇列來定。可以看看自己團隊的資源佇列的最大記憶體限制是多少,num-executors乘以executor-memory,是不能超過佇列的最大記憶體量的。此外,如果你是跟團隊裡其他人共享這個資源佇列,那麼申請的記憶體量最好不要超過資源佇列最大總記憶體的1/3~1/2,避免你自己的Spark作業佔用了佇列所有的資源,導致別的同學的作業無法執行。

executor-cores/spark.executor.cores

  • 引數說明:該引數用於設定每個Executor程序的CPU core數量。這個引數決定了每個Executor程序並行執行task執行緒的能力。因為每個CPU core同一時間只能執行一個task執行緒,因此每個Executor程序的CPU core數量越多,越能夠快速地執行完分配給自己的所有task執行緒。

  • 引數調優建議:Executor的CPU core數量設定為2~4個較為合適。同樣得根據不同部門的資源佇列來定,可以看看自己的資源佇列的最大CPU core限制是多少,再依據設定的Executor數量,來決定每個Executor程序可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個佇列,那麼num-executors * executor-cores不要超過佇列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學的作業執行。

driver-memory

  • 引數調優建議:Driver的記憶體通常來說不設定,或者設定1G左右應該就夠了。唯一需要注意的一點是,如果需要使用collect運算元將RDD的資料全部拉取到Driver上進行處理,那麼必須確保Driver的記憶體足夠大,否則會出現OOM記憶體溢位的問題。

spark.default.parallelism

  • 引數說明:該引數用於設定每個stage的預設task數量。這個引數極為重要,如果不設定可能會直接影響你的Spark作業效能。

  • 引數調優建議:Spark作業的預設task數量為500~1000個較為合適。很多同學常犯的一個錯誤就是不去設定這個引數,那麼此時就會導致Spark自己根據底層HDFS的block數量來設定task的數量,預設是一個HDFS block對應一個task。通常來說,Spark預設設定的數量是偏少的(比如就幾十個task),如果task數量偏少的話,就會導致你前面設定好的Executor的引數都前功盡棄。試想一下,無論你的Executor程序有多少個,記憶體和CPU有多大,但是task只有1個或者10個,那麼90%的Executor程序可能根本就沒有task執行,也就是白白浪費了資源!因此Spark官網建議的設定原則是,設定該引數為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數量為300個,那麼設定1000個task是可以的,此時可以充分地利用Spark叢集的資源。

spark.storage.memoryFraction

  • 引數說明:該引數用於設定RDD持久化資料在Executor記憶體中能佔的比例,預設是0.6。也就是說,預設Executor 60%的記憶體,可以用來儲存持久化的RDD資料。根據你選擇的不同的持久化策略,如果記憶體不夠時,可能資料就不會持久化,或者資料會寫入磁碟。

  • 引數調優建議:如果Spark作業中,有較多的RDD持久化操作,該引數的值可以適當提高一些,保證持久化的資料能夠容納在記憶體中。避免記憶體不夠快取所有的資料,導致資料只能寫入磁碟中,降低了效能。但是如果Spark作業中的shuffle類操作比較多,而持久化操作比較少,那麼這個引數的值適當降低一些比較合適。此外,如果發現作業由於頻繁的gc導致執行緩慢(通過spark web ui可以觀察到作業的gc耗時),意味著task執行使用者程式碼的記憶體不夠用,那麼同樣建議調低這個引數的值。

spark.shuffle.memoryFraction

  • 引數說明:該引數用於設定shuffle過程中一個task拉取到上個stage的task的輸出後,進行聚合操作時能夠使用的Executor記憶體的比例,預設是0.2。也就是說,Executor預設只有20%的記憶體用來進行該操作。shuffle操作在進行聚合時,如果發現使用的記憶體超出了這個20%的限制,那麼多餘的資料就會溢寫到磁碟檔案中去,此時就會極大地降低效能。

  • 引數調優建議:如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的記憶體佔比,提高shuffle操作的記憶體佔比比例,避免shuffle過程中資料過多時記憶體不夠用,必須溢寫到磁碟上,降低了效能。此外,如果發現作業由於頻繁的gc導致執行緩慢,意味著task執行使用者程式碼的記憶體不夠用,那麼同樣建議調低這個引數的值。

調優過程

資料量:10g

螢幕快照 2016-09-29 上午10.39.00.png

可以看出:

  • 隨著每個executor佔用的CPU core數增加,q04查詢的時間顯著下降,q03也下降,但幅度沒那麼大。

本次調優只設置了spark.executor.memoryspark.executor.cores兩個引數,沒有涉及到spark.executor.instances引數,而預設的spark.executor.instances為2,也就是每個作業只用到2個executor,因此還沒將效能發揮到最佳。

接下來採用100g的資料量,並且增加spark.executor.instances引數的設定。

資料量:100g

螢幕快照 2016-09-29 上午10.51.55.png

可以看出:

  • 調優前後查詢時間有了很大的飛躍;

  • 增加spark.executor.instances設定項指定每個作業佔用的executor個數後效能又有很大提升(通過監控我們發現此時CPU利用率平均有好幾十,甚至可以高到百分之九十幾);

  • 至此,我們終於將整個叢集效能充分發揮出來,達到目的。

最後一列配置項是根據美團技術團隊部落格的建議設定的,可以看出效能相比我們之前自己的設定還是有一定提升的,至少該部落格裡建議的設定是比較通用的,因此之後我們都採取最後一列的設定來跑TPCx-BB測試。

最後來張大圖展示調優前和調優後跑100g資料的對比:

調優前後100g.jpg.png

可以看出:

  • 絕大多數查詢調優前後查詢時間有了極大的飛躍;

  • 但是像q01/q04/q14…這幾個查詢,可能因為查詢涉及到的表比較小,調優前時間就很短,因此調優後也看不出很多差別,如果想看到大的差別,可能需要提高資料量,比如1T,3T;

  • q10和q18調優前後時間都較長,而且調優後效能沒有提升,需要再深入探索下是什麼原因。

最後,用調優後的叢集,分別跑10g、30g、100g的資料,結果如下:

10g、30g、100g.jpg.png

可以看出:

  • 隨著資料量增大,很多查詢時間並沒有明顯增加,可能是因為叢集效能太強,而且資料量還不夠大,可以增大資料量繼續觀察

  • 對於q10、q18和q30,隨著資料量增大,時間明顯增大,需再深入分析

    hive on spark引數配置樣例

    set hive.execution.engine=spark;
    set spark.executor.memory=4g;
    set spark.executor.cores=2;
    set spark.executor.instances=40;
    set spark.serializer=org.apache.spark.serializer.KryoSerializer;