Spark常用優化方法
一、前言
- 1.為什麼要優化?
因為你的資源有限、更快速的跑完任務、防止不穩定因素導致的任務失敗。 - 2.怎樣做優化?
通常檢視spark的web UI,或者檢視執行中的logs - 3.做哪方面的優化?
spark 應用程式 80% 的優化,都是集中在三個地方:記憶體,磁碟io,網路io
二、調優詳情
1.spark-submit命令中作為引數設定
資源引數設定的不合理,可能會導致沒有充分利用叢集資源,作業執行會極其緩慢;或者設定的資源過大,佇列沒有足夠的資源來提供,進而導致各種異常。
--num-executors //該引數用於設定Spark作業總共要用多少個Executor程序來執行 --executor-memory //該引數用於設定每個Executor程序的記憶體。Executor記憶體的大小,很多時候直接決定了Spark作業的效能,而且跟常見的JVM OOM異常,也有直接的關聯。 //每個Executor程序的記憶體設定4G~8G較為合適。num-executors乘以executor-memory,就代表了你的Spark作業申請到的總記憶體量(也就是所有Executor程序的記憶體總和) --executor-cores //該引數用於設定每個Executor程序的CPU core數量。這個引數決定了每個Executor程序並行執行task執行緒的能力。同樣建議,如果是跟他人共享這個佇列,那麼num-executors * executor-cores不要超過佇列總CPU core的1/3~1/2左右比較合適 -- driver-memory //設定Driver程序的記憶體,預設1G即可 -- spark.default.parallelism //該引數用於設定每個stage的預設task數量。這個引數極為重要。【如果不去設定這個引數,那麼此時就會導致Spark自己根據底層HDFS的block數量來設定task的數量,預設是一個HDFS block對應一個task。task數量偏少的話,就會導致你前面設定好的Executor的引數都前功盡棄。】 --spark.storage.memoryFraction //用於設定RDD持久化資料在Executor記憶體中能佔的比例,預設是0.6 --spark.shuffle.memoryFraction //設定shuffle過程中一個task拉取到上個stage的task的輸出後,進行聚合操作時能夠使用的Executor記憶體的比例,預設是0.2
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
1.提高並行度parallelism
如果設定了new SparkConf().set(“spark.default.parallelism”,“5”),所有的RDDpartition都會被設定為 5個,也就是每個RDD的資料都會被分為5分,每個partition都會啟動一個task來進行計算,對於所有的運算元操作,都只會使用5個task在叢集中執行。所以在這個時候,叢集中有10個cpucore,也只會使用5個來進行task,剩餘空閒。造成資源浪費。
面對10個core,我們可以設定10個甚至20、30個task,因為task之間的執行順序和時間是不一樣的,正好10個也會造成浪費。官方建議並行度設定為core數的2~3倍,可以最高效率使用資源。
2. repartition and coalesce
3.使用Kryo序列化機制
spark預設使用了java自身提供的序列化機制,基於ObjectInputStream和OBjectOutputStream的序列化機制,但是效能比較差,速度慢,序列化之後佔用空間依舊高
spark還提供了另外一種序列化機制——Kryo序列化機制,快,結果集小10倍,缺點是有些型別及時實現了Seriralizable介面,也不一定能被序列化。如果想要達到最好的效能,Kryo要求在Spark中對所有需要序列化的型別進行註冊
4.優化資料結構(屬於程式碼內優化)
5.對於多次使用的RDD進行持久化或者Checkpoint
6.設定廣播共享資料
使用Broadcast廣播,讓其在每個節點中一個副本,而不是每個task一個副本。減少節點上的記憶體佔用。
Broadcast廣播節點上使用該資料的時候不需要呼叫RDD,而是呼叫broadcastConf廣播副本,就可以節省記憶體和網路IO
7. 資料本地化(spark的內部優化機制)
資料本地化處理機制(基於資料距離程式碼的距離)
情況由好到壞:
(1)PROCESS_LOCAL:資料和計算它的程式碼在同一個JVM程序中
(2)NODE_LOCAL:資料和程式碼在同一節點上,但是不在一個程序中,比如在不容的executor程序彙總,或者資料在HDFS檔案的block中
(3)NO_PREF:資料從哪裡過來,最終的效能都是一樣的
(4)RACK_LOCAL:資料和程式碼在同一個機架上
(5)ANY:資料可能在任意地方,比如其他網路環境或者其他機架上
8.reduceByKey和groupByKey
一般情況下,reduceByKey的操作都是可以使用groupByKey().map()來進行替代操作的。但是 groupByKey不會進行本地聚合,原封不動的將ShuffleMapTask的輸出拉渠道ResultTask的記憶體中。因為reduceByKey首先會在map端進行本地的combine,可以大大減要傳輸到reduce的資料量,減少網路IO,只有在reduceByKey解決不了類似問題的時候才會使用groupByKey().map()來進行代替。
參考 https://blog.csdn.net/u012102306/article/details/51637366
一、Spark效能優化:開發調優篇
二、Spark效能優化:資源調優篇
三、Spark效能優化:資料傾斜調優
四、Spark效能優化:shuffle調優