1. 程式人生 > >Spark效能優化點(一)

Spark效能優化點(一)

1.分配更多的資源

   1.1.增加executor

    1.2.增加每個executor的cpu core

          增加executor的並行能力,一個cpu core執行一個task

    1.3.增加每個executor的記憶體

      1).如果需要對RDD進行cache,那麼更多的記憶體,就可以快取更多的資料,將更少的資料寫入磁碟,甚至不寫入磁碟。減少了       磁碟

IO

     2). 對於shuffle操作,reduce端,會需要記憶體來存放拉取的資料並進行聚合。如果記憶體不夠,也會寫入磁碟。如果給executor分          配更多記憶體以後,就有更少的資料,需要寫入磁碟,甚至不需要寫入磁碟。減少了磁碟IO,提升了效能。

     3).對於task的執行,可能會建立很多物件。如果記憶體比較小,可能會頻繁導致JVM堆記憶體滿了,然後頻繁GC,垃圾回收,                 

minor GCfull GC。(速度很慢)。記憶體加大以後,帶來更少的GC,垃圾回收,避免了速度變慢,速度變快了。

2.調節spark的並行能力

   2.1. Task數量,至少設定成與Spark application的總cpu core數量相同(最理想情況,比如總共150cpu core,分配了150task,一     起執行,差不多同一時間執行完畢)

   2.2 官方是推薦,task數量,設定成spark applicationcpu core數量的2~3倍,比如150cpu core,基本要設定

task數量為300~500

  2.3.設定方法:

spark.default.parallelism

SparkConf conf = new SparkConf()   .set("spark.default.parallelism", "500")

3.RDD重構

3.1.RDD架構重構與優化

     儘量去複用RDD,差不多的RDD,可以抽取稱為一個共同的RDD,供後面的RDD計算時,反覆使用。

3.2.公共RDD一定要實現持久化

     持久化要根據不同場景選擇cache或persist方法

3.3.持久化,是可以進行序列化的

     主要是指persist方法,選擇不同的持久化方式

     1).純記憶體MEMORY_ONLY:如果正常將資料持久化在記憶體中,那麼可能會導致記憶體的佔用過大,這樣的話,也許,會導致OOM記憶體溢位。

     2).MEMORY_ONLY_SER序列化記憶體儲存:當純記憶體無法支撐公共RDD資料完全存放的時候,就優先考慮,使用序列化的方式在純記憶體中儲存。將RDD的每個partition的資料,序列化成一個大的位元組陣列,就一個物件;序列化後,大大減少記憶體的空間佔用。

     序列化的方式,唯一的缺點就是,在獲取資料的時候,需要反序列化。

     3).記憶體+磁碟MEMORY_AND_DISK 或MEMORY_AND_DISK_SER

     如果序列化純記憶體方式,還是導致OOM,記憶體溢位;就只能考慮磁碟的方式,記憶體+磁碟的普通方式(無序列化)。

    MEMORY_AND_DISK :使用未序列化的Java物件格式,優先嚐試將資料儲存在記憶體中。如果記憶體不夠存放所有的資料,會       將資料寫入磁碟檔案中,下次對這個RDD執行運算元時,持久化在磁碟檔案中的資料會被讀取出來使用。

      MEMORY_AND_DISK_SER:基本含義同MEMORY_AND_DISK。唯一的區別是,會將RDD中的資料進行序列化,RDD的        每個partition會被序列化成一個位元組陣列。這種方式更加節省記憶體,從而可以避免持久化的資料佔用過多記憶體導致頻繁GC。

     4).DISK_ONLY純磁碟

       使用未序列化的Java物件格式,將資料全部寫入磁碟檔案中。

     5).MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等雙副本機制,為了資料的高可靠性,而且記憶體充足,可以使用雙副本        機制,進行持久化

       對於上述任意一種持久化策略,如果加上字尾_2,代表的是將每個持久化的資料,都複製一份副本,並將副本儲存到其他節點上。這種基於副本的持久化機制主要用於進行容錯。假如某個節點掛掉,節點的記憶體或磁碟中的持久化資料丟失了,那麼後續對RDD計算時還可以使用該資料在其他節點上的副本。如果沒有副本的話,就只能將這些資料從源頭處重新計算一遍了。

4.使用廣播變數

  廣播變數,初始的時候,就在Drvier上有一份副本。

   task在執行的時候,想要使用廣播變數中的資料,此時首先會在自己本地的Executor對應的BlockManager中,嘗試獲取變數副本;如果本地沒有,那麼就從Driver遠端拉取變數副本,並儲存在本地的BlockManager中;此後這個executor上的task,都會直接使用本地的BlockManager中的副本。executorBlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變數副本,舉例越近越好。

5.優化序列化方法

預設情況下,Spark內部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream,物件輸入輸出流機制,來進行序列化。這種預設序列化機制的好處在於,處理起來比較方便;也不需要我們手動去做什麼事情,只是,你在運算元裡面使用的變數,必須是實現Serializable介面的,可序列化即可。

但是缺點在於,預設的序列化機制的效率不高,序列化的速度比較慢;序列化以後的資料,佔用的記憶體空間相對還是比較大。

可以手動進行序列化格式的優化Spark支援使用Kryo序列化機制。Kryo序列化機制,比預設的Java序列化機制,速度要快,序列化後的資料要更小,大概是Java序列化機制的1/10

所以Kryo序列化優化以後,可以讓網路傳輸的資料變少;在叢集中耗費的記憶體資源大大減少。

5.1.Kryo序列化機制,一旦啟用以後,會生效的幾個地方:

   1、運算元函式中使用到的外部變數,使用Kryo以後:優化網路傳輸的效能,可以優化叢集中記憶體的佔用和消耗

   2、持久化RDD,優化記憶體的佔用和消耗;持久化RDD佔用的記憶體越少,task執行的時候,建立的物件,就不至於頻繁的佔滿       記憶體,頻繁發生GC

   3shuffle:可以優化網路傳輸的效能

6.使用fastutil擴充套件的集合框架

6.1.fastutil介紹

fastutil是擴充套件了Java標準集合框架(MapListSetHashMapArrayListHashSet)的類庫,提供了特殊型別的mapsetlistqueue

fastutil能夠提供更小的記憶體佔用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的MapListSet,好處在於,fastutil集合類,可以減小記憶體的佔用,並且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設定元素的值的時候,提供更快的存取速度;

fastutil也提供了64位的arraysetlist,以及高效能快速的,以及實用的IO類,來處理二進位制和文字型別的檔案;

fastutil最新版本要求Java 7以及以上版本;

6.2.Spark中應用fastutil的場景:

1、如果運算元函式使用了外部變數;那麼第一,你可以使用Broadcast廣播變數優化;第二,可以使用Kryo序列化類庫,提升序列化效能和效率;第三,如果外部變數是某種比較大的集合,那麼可以考慮使用fastutil改寫外部變數,首先從源頭上就減少記憶體的佔用,通過廣播變數進一步減少記憶體佔用,再通過Kryo序列化類庫進一步減少記憶體佔用。

2、在你的運算元函式裡,也就是task要執行的計算邏輯裡面,如果有邏輯中,出現,要建立比較大的MapList等集合,可能會佔用較大的記憶體空間,而且可能涉及到消耗效能的遍歷、存取等集合操作;那麼此時,可以考慮將這些集合型別使用fastutil類庫重寫,使用了fastutil集合類以後,就可以在一定程度上,減少task創建出來的集合型別的記憶體佔用。避免executor記憶體頻繁佔滿,頻繁喚起GC,導致效能下降。

6.3.fastutil的使用  

第一步:在pom.xml中引用fastutil的包

<dependency>

    <groupId>fastutil</groupId>

    <artifactId>fastutil</artifactId>

    <version>5.0.9</version>

</dependency>

速度比較慢,可能是從國外的網去拉取jar包,可能要等待5分鐘,甚至幾十分鐘,不等

List<Integer> => IntList

基本都是類似於IntList的格式,字首就是集合的元素型別;特殊的就是MapInt2IntMap,代表了key-value對映的元素型別。除此之外,剛才也看到了,還支援objectreference

7.程序本地化,程式碼和資料在同一個程序中

SparkDriver上,對Application的每一個stagetask,進行分配之前,都會計算出每個task要計算的是哪個分片資料,RDD的某個partitionSparktask分配演算法,優先,會希望每個task正好分配到它要計算的資料所在的節點,這樣的話,就不用在網路間傳輸資料;但是呢,通常來說,有時,事與願違,可能task沒有機會分配到它的資料所在的節點,為什麼呢,可能那個節點的計算資源和計算能力都滿了;所以呢,這種時候,通常來說,Spark會等待一段時間,預設情況下是3s鍾(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最後,實在是等待不了了,就會選擇一個比較差的本地化級別,比如說,將task分配到靠它要計算的資料所在節點,比較近的一個節點,然後進行計算。

所以可以適當調節等待時間。

7.1.如何判斷要調節

觀察日誌,spark作業的執行日誌,推薦大家在測試的時候,先用client模式,在本地就直接可以看到比較全的日誌。日誌裡面會顯示,starting task。。。,PROCESS LOCAL、NODE LOCAL。觀察大部分task的資料本地化級別,如果大多都是PROCESS_LOCAL,那就不用調節了,如果是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下資料本地化的等待時長,調節完,應該是要反覆調節,每次調節完以後,再來執行,觀察日誌,看看大部分的task的本地化級別有沒有提升;看看,整個spark作業的執行時間有沒有縮短

7.2.怎麼調節

spark.locality.wait,預設是3s6s10s

預設情況下,下面3個的等待時長,都是跟上面那個是一樣的,都是3s

spark.locality.wait.process

spark.locality.wait.node

spark.locality.wait.rack

new SparkConf()  .set("spark.locality.wait", "10")

8.降低cache操作的記憶體佔比

spark中,堆記憶體又被劃分成了兩塊兒,一塊兒是專門用來給RDDcachepersist操作進行RDD資料快取用的;另外一塊兒,就是我們剛才所說的,用來給spark運算元函式的執行使用的,存放函式中自己建立的物件。預設情況下,給RDD cache操作的記憶體佔比,是0.660%的記憶體都給了cache操作了。但是問題是,如果某些情況下,cache不是那麼的緊張,問題在於task運算元函式中建立的物件過多,然後記憶體又不太大,導致了頻繁的minor gc,甚至頻繁full gc,導致spark頻繁的停止工作。效能影響會很大。

在yarn去執行的話,那麼就通過yarn的介面,去檢視你的spark作業的執行統計,很簡單,大家一層一層點選進去就好。可以看到每個stage的執行情況,包括每個task的執行時間、gc時間等等。如果發現gc太頻繁,時間太長。此時就可以適當調價這個比例。

降低cache操作的記憶體佔比,大不了用persist操作,選擇將一部分快取的RDD資料寫入磁碟,或者序列化方式,配合Kryo序列化類,減少RDD快取的記憶體佔用;降低cache操作記憶體佔比;對應的,運算元函式的記憶體佔比就提升了。這個時候,可能,就可以減少minor gc的頻率,同時減少full gc的頻率。對效能的提升是有一定的幫助的。

一句話,讓task執行運算元函式時,有更多的記憶體可以使用。spark.storage.memoryFraction0.6 -> 0.5 -> 0.4 -> 0.2

9.修改executor堆外記憶體

9.1. 有時候,如果你的spark作業處理的資料量特別特別大,幾億資料量;然後spark作業一執行,時不時的報錯,shuffle file cannot findexecutortask lostout of memory(記憶體溢位);

可能是說executor的堆外記憶體不太夠用,導致executor在執行的過程中,可能會記憶體溢位;然後可能導致後續的stagetask在執行的時候,可能要從一些executor中去拉取shuffle map output檔案,但是executor可能已經掛掉了,關聯的block manager也沒有了;所以可能會報shuffle output file not foundresubmitting taskexecutor lostspark作業徹底崩潰

上述情況下,就可以去考慮調節一下executor的堆外記憶體。也許就可以避免報錯;此外,有時,堆外記憶體調節的比較大的時候,對於效能來說,也會帶來一定的提升。

9.2.如何調節

--conf spark.yarn.executor.memoryOverhead=2048

spark-submit腳本里面,去用--conf的方式,去新增配置;一定要注意!!!切記,不是在你的spark作業程式碼中,用new SparkConf().set()這種方式去設定,不要這樣去設定,是沒有用的!一定要在spark-submit指令碼中去設定。

spark.yarn.executor.memoryOverhead(看名字,顧名思義,針對的是基於yarn的提交模式)

預設情況下,這個堆外記憶體上限大概是300M;後來我們通常專案中,真正處理大資料的時候,這裡都會出現問題,導致spark作業反覆崩潰,無法執行;此時就會去調節這個引數,到至少1G1024M),甚至說2G4G通常這個引數調節上去以後,就會避免掉某些JVM OOM的異常問題,同時呢,會讓整體spark作業的效能,得到較大的提升。

9.3.遠端拉取資料建立網路異常

當上面的blockmanager本地沒有資料就會嘗試建立遠端的網路連線,並且去拉取資料,此時如果對應的blockmanager由於oom崩潰,就會連線沒有響應,無法建立網路連線;會卡住;okspark預設的網路連線的超時時長,是60s;如果卡住60s都無法建立連線的話,那麼就宣告失敗了。

碰到一種情況,偶爾,偶爾,偶爾!!!沒有規律!!!某某file。一串file iduuiddsfsfd-2342vs--sdf--sdfsd)。not foundfile lost

這種情況下,很有可能是有那份資料的executorjvm gc。所以拉取資料的時候,建立不了連線。然後超過預設60s以後,直接宣告失敗。

報錯幾次,幾次都拉取不到資料的話,可能會導致spark作業的崩潰。也可能會導致DAGScheduler,反覆提交幾次stageTaskScheduler,反覆提交幾次task。大大延長我們的spark作業的執行時間。

可以考慮調節連線的超時時長。

--conf spark.core.connection.ack.wait.timeout=300

spark-submit指令碼,切記,不是在new SparkConf().set()這種方式來設定的。