1. 程式人生 > >Spark效能調優-並行度調優

Spark效能調優-並行度調優

效能調優:


並行度調節


效能調優首先是增加資源,增加Application對應的executor的數量,增加executor裡面的cpu core,然後
增加executor裡面的記憶體大小!


這節課也是非常重要的,因為分配完你所能分配的最大資源了!然後對應你的資源調節你程式的並行度!


Spark並行度指的是什麼?
Spark作業,Application,Jobs,action(collect)觸發一個job,1個job;每個job拆成多個stage,
發生shuffle的時候,會拆分出一個stage,reduceByKey;


stage0
val lines = sc.textFile("hdfs://")
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordCount = pairs.reduceByKey(_ + _)


stage1
val wordCount = pairs.reduceByKey(_ + _)
wordCount.collect()


reduceByKey,stage0的task,在最後,執行到reduceByKey的時候,會為每個stage1的task,
都建立一份檔案(也可能是合併在少量的檔案裡面);每個stage1的task,會去各個節點上的各個
task建立的屬於自己的那一份檔案裡面,拉取資料;每個stage1的task,拉取到的資料,
一定是相同key對應的資料。對相同的key,對應的values,才能去執行我們自定義的function操作(_ + _)




並行度:其實就是指的是,Spark作業中,各個stage的task數量,也就代表了Spark作業的在各個階段
(stage)的並行度。


如果不調節並行度,導致並行度過低,會怎麼樣?


假設,現在已經在spark-submit腳本里面,給我們的spark作業分配了足夠多的資源,比如50個executor,
每個executor有10G記憶體,每個executor有3個cpu core。基本已經達到了叢集或者yarn佇列的資源上限。


task沒有設定,或者設定的很少,比如就設定了,100個task。50個executor,每個executor有3個
cpu core,也就是說,你的Application任何一個stage執行的時候,都有總數在150個cpu core,
可以並行執行。但是你現在,只有100個task,平均分配一下,每個executor分配到2個task,ok,
那麼同時在執行的task,只有100個,每個executor只會並行執行2個task。每個executor剩下的一個
cpu core,就浪費掉了。


你的資源雖然分配足夠了,但是問題是,並行度沒有與資源相匹配,導致你分配下去的資源都浪費掉了。
合理的並行度的設定,應該是要設定的足夠大,大到可以完全合理的利用你的叢集資源;比如上面的例子,
總共叢集有150個cpu core,可以並行執行150個task。那麼就應該將你的Application的並行度,
至少設定成150,才能完全有效的利用你的叢集資源,讓150個task,並行執行;而且task增加到150個以後,
即可以同時並行執行,還可以讓每個task要處理的資料量變少;比如總共150G的資料要處理,
如果是100個task,每個task計算1.5G的資料;現在增加到150個task,可以並行執行,
而且每個task主要處理1G的資料就可以。


很簡單的道理,只要合理設定並行度,就可以完全充分利用你的叢集計算資源,
並且減少每個task要處理的資料量,最終,就是提升你的整個Spark作業的效能和執行速度。


1、task數量,至少設定成與Spark application的總cpu core數量相同(最理想情況,比如總共150個
cpu core,分配了150個task,一起執行,差不多同一時間執行完畢)


2、官方是推薦,task數量,設定成spark application總cpu core數量的2~3倍,比如150個cpu core,
基本要設定task數量為300~500;
實際情況,與理想情況不同的,有些task會執行的快一點,比如50s就完了,有些task,可能會慢一點,
要1分半才執行完,所以如果你的task數量,剛好設定的跟cpu core數量相同,可能還是會導致資源的浪費,
因為,比如150個task,10個先執行完了,剩餘140個還在執行,但是這個時候,有10個cpu core就空閒出來了,
就導致了浪費。那如果task數量設定成cpu core總數的2~3倍,那麼一個task執行完了以後,
另一個task馬上可以補上來,就儘量讓cpu core不要空閒,同時也是儘量提升spark作業執行的效率和速度,
提升效能。


3、如何設定一個Spark Application的並行度?
spark.default.parallelism 
SparkConf conf = new SparkConf()
  .set("spark.default.parallelism", "500")