1. 程式人生 > 實用技巧 >SparkSubmit引數及引數效能調優

SparkSubmit引數及引數效能調優

首先擺出常用的引數設定

bin/spark-submit \
--class com.xyz.bigdata.calendar.PeriodCalculator \
--master yarn \
--deploy-mode cluster \
--queue default_queue \
--num-executors 50 \
--executor-cores 2 \
--executor-memory 4G \
--driver-memory 2G \
--conf "spark.default.parallelism=250" \
--conf "spark.shuffle.memoryFraction=0.3
" \ --conf "spark.storage.memoryFraction=0.5" \ --conf "spark.driver.extraJavaOptions=-XX:+UseG1GC" \ --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \ --verbose \ ${PROJECT_DIR}/bigdata-xyz-0.1.jar

關於spark-submit的執行過程,讀Spark Core的原始碼能夠獲得一個大致的印象。今天事情比較多,所以之後會另寫文章專門敘述關於Spark on YARN的事情(又挖了一個坑,上一個坑是關於Java String和JVM的,需要儘快填上了)。

num-executors

  • 含義:設定Spark作業要用多少個Executor程序來執行。
  • 設定方法:根據我們的實踐,設定在30~100個之間為最佳。如果不設定,預設只會啟動非常少的Executor。如果設得太小,無法充分利用計算資源。設得太大的話,又會搶佔叢集或佇列的資源,導致其他作業無法順利執行。

executor-cores

  • 含義:設定每個Executor能夠利用的CPU核心數(這裡核心指的是vCore)。核心數越多,並行執行Task的效率也就越高。
  • 設定方法:根據我們的實踐,設定在2~6之間都是可以的,主要是根據業務型別和資料處理邏輯的複雜程度來定,一般來講設2或者3就夠用了。需要注意的是,num-executors * executor-cores不能將佇列中的CPU資源耗盡,最好不要超過總vCore數的1/3,以給其他作業留下剩餘資源。

executor-memory

  • 含義:設定每個Executor的記憶體量(堆內記憶體)。這個引數比executor-cores更為重要,因為Spark作業的本質就是記憶體計算,記憶體的大小直接影響效能,並且與磁碟溢寫、OOM等都相關。
  • 設定方法:一般設定在2G~8G之間,需要根據資料量慎重做決定。如果作業執行非常慢,出現頻繁GC或者OOM,就得適當調大記憶體。並且與上面相同,num-executors * executor-memory也不能過大,最好不要超過佇列總記憶體量的一半。
    另外,還有一個配置項spark.executor.memoryOverhead,用來設定每個Executor可使用的堆外記憶體大小,預設值是executor-memory的0.1倍,最小值384M。一般來講都夠用,不用特意設定。

driver-memory

  • 含義:設定Driver程序的記憶體量(堆內記憶體)。
  • 設定方法:由於我們幾乎不會使用collect()之類的運算元把大量RDD資料都拉到Driver上來處理,所以它的記憶體可以不用設得過大,2G可以應付絕大多數情況。但是,如果Spark作業處理完後資料膨脹比較多,那麼還是應該酌情加大這個值。
    與上面一項相同,spark.driver.memoryOverhead用來設定Driver可使用的堆外記憶體大小。

spark.default.parallelism

  • 含義:對於shuffle運算元,如reduceByKey()或者join(),這個引數用來指定父RDD中最大分割槽數。由於分割槽與Task有一一對應關係,因此也可以理解為Task數。其名稱的字面意義是“並行度”,不能直接表達出這種含義。
  • 設定方法:Spark官方文件中推薦每個CPU core執行2~3個Task比較合適,因此這個值要設定為(num-executors * executor-cores)的2~3倍。這個引數同樣非常重要,因為如果不設定的話,分割槽數就會由RDD本身的分割槽來決定,這樣往往會使得計算效率低下。

spark.shuffle.memoryFraction

  • 含義:shuffle操作(聚合、連線、分組等等)能夠使用的可用堆記憶體(堆大小減去300MB保留空間)的比例,預設值是0.2。如果shuffle階段使用的記憶體比例超過這個值,就會溢寫到磁碟。
  • 設定方法:取決於計算邏輯中shuffle邏輯的複雜度,如果會產生大量資料,那麼一定要調高。在我們的實踐中,一般都設定在0.3左右。但是,如果調太高之後發現頻繁GC,那麼就是執行使用者程式碼的execution記憶體不夠用了,適當降低即可。

spark.storage.memoryFraction

  • 含義:快取操作(persist/cache)能夠使用的可用堆記憶體的比例,預設值是0.6。
  • 設定方法:如果經常需要快取非常大的RDD,那麼就需要調高。否則,如果shuffle操作更為重量級,適當調低也無妨。我們一般設定在0.5左右。

其實,spark.shuffle/storage.memoryFraction是舊版的靜態記憶體管理(StaticMemoryManager)的遺產。在Spark 1.6版本之後的文件中已經標記成了deprecated。目前取代它們的是spark.memory.fraction和spark.memory.storageFraction這兩項,參考新的統一記憶體管理(UnifiedMemoryManager)機制可以得到更多細節。
前者的含義是總記憶體佔堆的比例,即execution+storage+shuffle記憶體的總量。後者則是storage記憶體佔前者的比例。預設值分別為0.75(最新版變成了0.6)和0.5。

spark.driver/executor.extraJavaOptions

  • 含義:Driver或Executor程序的其他JVM引數。
  • 設定方法:一般可以不設定。如果設定,常見的情景是使用-Xmn加大年輕代記憶體的大小,或者手動指定垃圾收集器(最上面的例子中使用了G1,也有用CMS的時候)及其相關引數。

舉例

舉例,以我上一個老東家的引數命令,假設資料量400G,要求1個半小時內跑完

spark-submit --master yarn \
--class com.sddt.spark.web.WebETL \
--driver-memory 10G \
--executor-memory 24G \
--num-executors 20 \
--executor-cores 4 \
--conf spark.web.etl.inputBaseDir=hdfs://master:9999/user/hive/warehouse/rawdata.db/web \
--conf spark.web.etl.outputBaseDir=hdfs://master:9999/user/hadoop-twq/traffic-analysis/web \
--conf spark.web.etl.startDate=20180617 \
--conf spark.driver.extraJavaOptions="-Dweb.metadata.mongodbAddr=192.168.1.102 -Dweb.etl.hbase.zk.quorums=master" \
--conf spark.executor.extraJavaOptions="-Dweb.metadata.mongodbAddr=192.168.1.102 -Dweb.etl.hbase.zk.quorums=master -Dcom.sun.management.jmxremote.port=1119 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" \
/home/hadoop-twq/traffice-analysis/jars/spark-sessionization-etl-1.0-SNAPSHOT-jar-with-dependencies.jar prod

  

效能調優

Spark效能調優的第一步,就是為任務分配更多的資源,在一定範圍內,增加資源的分配與效能的提升是成正比的,實現了最優的資源配置後,在此基礎上再考慮進行後面論述的效能調優策略。

資源的分配在使用指令碼提交Spark任務時進行指定,標準的Spark任務提交指令碼如程式碼清單2-1所示:

/usr/opt/modules/spark/bin/spark-submit \
--class com.sddt.spark.Analysis \
--num-executors 80 \
--driver-memory 6g \
--executor-memory 6g \
--executor-cores 3 \
/usr/opt/modules/spark/jar/spark.jar \

 

可以進行分配的資源如表2-1所示:

名稱

說明

--num-executors

配置Executor的數量

--driver-memory

配置Driver記憶體(影響不大)

--executor-memory

配置每個Executor的記憶體大小

--executor-cores

配置每個Executor的CPU core數量

 

調節原則:儘量將任務分配的資源調節到可以使用的資源的最大限度。

對於具體資源的分配,我們分別討論Spark的兩種Cluster執行模式:

第一種是Spark Standalone模式,你在提交任務前,一定知道或者可以從運維部門獲取到你可以使用的資源情況,在編寫submit指令碼的時候,就根據可用的資源情況進行資源的分配,比如說叢集有15臺機器,每臺機器為8G記憶體,2個CPU core,那麼就指定15個Executor,每個Executor分配8G記憶體,2個CPU core。

第二種是Spark Yarn模式,由於Yarn使用資源佇列進行資源的分配和排程,在表寫submit指令碼的時候,就根據Spark作業要提交到的資源佇列,進行資源的分配,比如資源佇列有400G記憶體,100個CPU core,那麼指定50個Executor,每個Executor分配8G記憶體,2個CPU core。

對錶2-1中的各項資源進行了調節後,得到的效能提升如表2-2所示:

名稱

解析

增加Executor·個數

在資源允許的情況下,增加Executor的個數可以提高執行task的並行度。比如有4個Executor,每個Executor有2個CPU core,那麼可以並行執行8個task,如果將Executor的個數增加到8個(資源允許的情況下),那麼可以並行執行16個task,此時的並行能力提升了一倍。

增加每個Executor的CPU core個數

在資源允許的情況下,增加每個Executor的Cpu core個數,可以提高執行task的並行度。比如有4個Executor,每個Executor有2個CPU core,那麼可以並行執行8個task,如果將每個Executor的CPU core個數增加到4個(資源允許的情況下),那麼可以並行執行16個task,此時的並行能力提升了一倍。

增加每個Executor的記憶體量

在資源允許的情況下,增加每個Executor的記憶體量以後,對效能的提升有三點:

  1. 可以快取更多的資料(即對RDD進行cache),寫入磁碟的資料相應減少,甚至可以不寫入磁碟,減少了可能的磁碟IO;
  2. 可以為shuffle操作提供更多記憶體,即有更多空間來存放reduce端拉取的資料,寫入磁碟的資料相應減少,甚至可以不寫入磁碟,減少了可能的磁碟IO;
  3. 可以為task的執行提供更多記憶體,在task的執行過程中可能建立很多物件,記憶體較小時會引發頻繁的GC,增加記憶體後,可以避免頻繁的GC,提升整體效能。