1. 程式人生 > >Spark專案實戰-實際專案中常見的優化點-分配更多的資源和調節並行度

Spark專案實戰-實際專案中常見的優化點-分配更多的資源和調節並行度

1、分配更多的資源

(1)分配哪些資源?executor、cpu per executor、memory per executor、driver memory。

(2)在哪裡分配這些資源?在我們在生產環境中,提交spark作業時,用的spark-submit shell指令碼,裡面調整對應的引數 。

/usr/local/spark/bin/spark-submit \
--class cn.spark.sparktest.core.WordCountCluster \
--num-executors 3 \  配置executor的數量
--driver-memory 100m \  配置driver的記憶體(影響不大)
--executor-memory 100m \  配置每個executor的記憶體大小
--executor-cores 3 \  配置每個executor的cpu core數量
/usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

(3)調節到多大,算是最大呢?

第一種:Spark Standalone模式。叢集上搭建了一套Spark叢集,我們應該清楚每臺機器還能夠給你使用的,大概有多少記憶體,多少cpu core。那麼設定的時候就根據這個實際的情況去調節每個spark作業的資源分配。比如說你的每臺機器能夠給你使用4G記憶體,2個cpu core,20臺機器;然後有20個executor,那麼平均每個executor就4G記憶體,2個cpu core。

第二種:Yarn模式。那麼我們應該看下spark作業要提交到的資源佇列大概有多少資源?500G記憶體,100個cpu core;然後有50個executor,那麼平均每個executor就10G記憶體,2個cpu core。

一個原則,你能使用的資源有多大,就儘量去調節到最大的大小(executor的數量,幾十個到上百個不等;executor記憶體;executor cpu core)

(4)為什麼調節了資源以後,效能可以提升?

我們知道SparkContext會將我們的運算元切割成大量的task,提交到Application的executor上面去執行。那麼:

第一點:增加executor。如果executor數量比較少,那麼能夠並行執行的task數量就比較少,就意味著我們的Application的並行執行的能力就很弱。 比如有3個executor,每個executor有2個cpu core,那麼同時能夠並行執行的task就是6個。6個執行完以後,再換下一批6個task。 增加了executor數量以後,那麼就意味著能夠並行執行的task數量也就變多了。比如原先是6個,現在可能可以並行執行10個,甚至20個,100個。那麼並行能力就比之前提升了數倍數十倍。 相應的效能(執行的速度)也能提升數倍~數十倍。

第二點:增加每個executor的cpu core,也是增加了執行的並行能力。原本20個executor,每個才2個cpu core。能夠並行執行的task數量就是40個task。現在每個executor的cpu core增加到了5個。能夠並行執行的task數量就是100個task。執行的速度,提升了2.5倍。

第三點:增加每個executor的記憶體量。增加了記憶體量以後對效能的提升有以下三點:

a、如果需要對RDD進行cache,那麼更多的記憶體就可以快取更多的資料,將更少的資料寫入磁碟甚至不寫入磁碟。減少了磁碟IO。

b、對於shuffle操作,reduce端會需要記憶體來存放拉取的資料並進行聚合。如果記憶體不夠也會寫入磁碟。如果給executor分配更多記憶體以後,就有更少的資料需要寫入磁碟甚至不需要寫入磁碟。減少了磁碟IO,提升了效能。

c、對於task的執行可能會建立很多物件。如果記憶體比較小,可能會頻繁導致JVM堆記憶體滿了,然後頻繁GC,垃圾回收,minor GC和full GC。(速度很慢)。記憶體加大以後,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。

2、調節並行度

並行度:其實指的是Spark作業中各個stage的task數量,也就代表了Spark作業的在各個階段(stage)的並行度。

如果不調節並行度導致並行度過低會怎麼樣?

假設現在已經在spark-submit腳本里面,給我們的spark作業分配了足夠多的資源,比如50個executor,每個executor有10G記憶體,每個executor有3個cpu core。

但是task沒有設定或者設定的很少,比如就設定了100個task。那麼50個executor,每個executor有3個cpu core,也就是說你的Application任何一個stage執行的時候都有總數在150個cpu core可以並行執行。但是現在只有100個task,平均分配一下,每個executor分配到2個task,那麼同時在執行的task只有100個,每個executor只會並行執行2個task。每個executor剩下的一個cpu core就浪費掉了。

你的資源雖然分配足夠了,但是問題是並行度沒有與資源相匹配,導致你分配下去的資源都浪費掉了。

如何合理設定並行度?

合理的並行度的設定,應該是要設定的足夠大,大到可以完全合理的利用你的叢集資源。比如上面的例子,總共叢集有150個cpu core,可以並行執行150個task。那麼就應該將你的Application的並行度,至少設定成150,才能完全有效的利用你的叢集資源,讓150個task並行執行。而且task增加到150個以後,即可以同時並行執行,還可以讓每個task要處理的資料量變少。比如總共150G的資料要處理,如果是100個task,每個task計算1.5G的資料;現在增加到150個task可以並行執行,而且每個task主要處理1G的資料就可以。

很簡單的道理,只要合理設定並行度就可以完全充分利用你的叢集計算資源,並且減少每個task要處理的資料量,最終就是提升你的整個Spark作業的效能和執行速度。

(1)task數量至少設定成與Spark application的總cpu core數量相同(最理想情況,比如總共150個cpu core,分配了150個task,一起執行,差不多同一時間執行完畢)

(2)官方是推薦,task數量設定成spark application總cpu core數量的2~3倍,比如150個cpu core,基本要設定task數量為300~500。

實際情況與理想情況不同的,有些task會執行的快一點,比如50s就完了,有些task可能會慢一點要1分半才執行完,所以如果你的task數量,剛好設定的跟cpu core數量相同,可能還是會導致資源的浪費,因為比如150個task,10個先執行完了,剩餘140個還在執行,但是這個時候有10個cpu core就空閒出來了就導致了浪費。那如果task數量設定成cpu core總數的2~3倍,那麼一個task執行完了以後,另一個task馬上可以補上來就儘量讓cpu core不要空閒,同時也是儘量提升spark作業執行的效率和速度提升效能。

(3)如何設定一個Spark Application的並行度?

SparkConf conf = new SparkConf() //
        .setAppName(Constants.SPARK_APP_NAME_SESSION) //
        .setMaster("local") //
        .set("spark.default.parallelism", "10"); // 設定並行度,理想值應該是總cpu數量的2~3倍。