Spark效能優化點(一)
1.分配更多的資源
1.1.增加executor
1.2.增加每個executor的cpu core
增加executor的並行能力,一個cpu core執行一個task
1.3.增加每個executor的記憶體
1).如果需要對RDD進行cache,那麼更多的記憶體,就可以快取更多的資料,將更少的資料寫入磁碟,甚至不寫入磁碟。減少了 磁碟
2). 對於shuffle操作,reduce端,會需要記憶體來存放拉取的資料並進行聚合。如果記憶體不夠,也會寫入磁碟。如果給executor分 配更多記憶體以後,就有更少的資料,需要寫入磁碟,甚至不需要寫入磁碟。減少了磁碟IO,提升了效能。
3).對於task的執行,可能會建立很多物件。如果記憶體比較小,可能會頻繁導致JVM堆記憶體滿了,然後頻繁GC,垃圾回收,
2.調節spark的並行能力
2.1. Task數量,至少設定成與Spark application的總cpu core數量相同(最理想情況,比如總共150個cpu core,分配了150個task,一 起執行,差不多同一時間執行完畢)
2.2 官方是推薦,task數量,設定成spark application總cpu core數量的2~3倍,比如150個cpu core,基本要設定
2.3.設定方法:
spark.default.parallelism
SparkConf conf = new SparkConf() .set("spark.default.parallelism", "500")
3.RDD重構
3.1.RDD架構重構與優化
儘量去複用RDD,差不多的RDD,可以抽取稱為一個共同的RDD,供後面的RDD計算時,反覆使用。
3.2.公共RDD一定要實現持久化
持久化要根據不同場景選擇cache或persist方法
3.3.持久化,是可以進行序列化的
主要是指persist方法,選擇不同的持久化方式
1).純記憶體MEMORY_ONLY:如果正常將資料持久化在記憶體中,那麼可能會導致記憶體的佔用過大,這樣的話,也許,會導致OOM記憶體溢位。
2).MEMORY_ONLY_SER序列化記憶體儲存:當純記憶體無法支撐公共RDD資料完全存放的時候,就優先考慮,使用序列化的方式在純記憶體中儲存。將RDD的每個partition的資料,序列化成一個大的位元組陣列,就一個物件;序列化後,大大減少記憶體的空間佔用。
序列化的方式,唯一的缺點就是,在獲取資料的時候,需要反序列化。
3).記憶體+磁碟MEMORY_AND_DISK 或MEMORY_AND_DISK_SER
如果序列化純記憶體方式,還是導致OOM,記憶體溢位;就只能考慮磁碟的方式,記憶體+磁碟的普通方式(無序列化)。
MEMORY_AND_DISK :使用未序列化的Java物件格式,優先嚐試將資料儲存在記憶體中。如果記憶體不夠存放所有的資料,會 將資料寫入磁碟檔案中,下次對這個RDD執行運算元時,持久化在磁碟檔案中的資料會被讀取出來使用。
MEMORY_AND_DISK_SER:基本含義同MEMORY_AND_DISK。唯一的區別是,會將RDD中的資料進行序列化,RDD的 每個partition會被序列化成一個位元組陣列。這種方式更加節省記憶體,從而可以避免持久化的資料佔用過多記憶體導致頻繁GC。
4).DISK_ONLY純磁碟
使用未序列化的Java物件格式,將資料全部寫入磁碟檔案中。
5).MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等雙副本機制,為了資料的高可靠性,而且記憶體充足,可以使用雙副本 機制,進行持久化
對於上述任意一種持久化策略,如果加上字尾_2,代表的是將每個持久化的資料,都複製一份副本,並將副本儲存到其他節點上。這種基於副本的持久化機制主要用於進行容錯。假如某個節點掛掉,節點的記憶體或磁碟中的持久化資料丟失了,那麼後續對RDD計算時還可以使用該資料在其他節點上的副本。如果沒有副本的話,就只能將這些資料從源頭處重新計算一遍了。
4.使用廣播變數
廣播變數,初始的時候,就在Drvier上有一份副本。
task在執行的時候,想要使用廣播變數中的資料,此時首先會在自己本地的Executor對應的BlockManager中,嘗試獲取變數副本;如果本地沒有,那麼就從Driver遠端拉取變數副本,並儲存在本地的BlockManager中;此後這個executor上的task,都會直接使用本地的BlockManager中的副本。executor的BlockManager除了從driver上拉取,也可能從其他節點的BlockManager上拉取變數副本,舉例越近越好。
5.優化序列化方法
預設情況下,Spark內部是使用Java的序列化機制,ObjectOutputStream / ObjectInputStream,物件輸入輸出流機制,來進行序列化。這種預設序列化機制的好處在於,處理起來比較方便;也不需要我們手動去做什麼事情,只是,你在運算元裡面使用的變數,必須是實現Serializable介面的,可序列化即可。
但是缺點在於,預設的序列化機制的效率不高,序列化的速度比較慢;序列化以後的資料,佔用的記憶體空間相對還是比較大。
可以手動進行序列化格式的優化Spark支援使用Kryo序列化機制。Kryo序列化機制,比預設的Java序列化機制,速度要快,序列化後的資料要更小,大概是Java序列化機制的1/10。
所以Kryo序列化優化以後,可以讓網路傳輸的資料變少;在叢集中耗費的記憶體資源大大減少。
5.1.Kryo序列化機制,一旦啟用以後,會生效的幾個地方:
1、運算元函式中使用到的外部變數,使用Kryo以後:優化網路傳輸的效能,可以優化叢集中記憶體的佔用和消耗
2、持久化RDD,優化記憶體的佔用和消耗;持久化RDD佔用的記憶體越少,task執行的時候,建立的物件,就不至於頻繁的佔滿 記憶體,頻繁發生GC。
3、shuffle:可以優化網路傳輸的效能
6.使用fastutil擴充套件的集合框架
6.1.fastutil介紹
fastutil是擴充套件了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊型別的map、set、list和queue;
fastutil能夠提供更小的記憶體佔用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set,好處在於,fastutil集合類,可以減小記憶體的佔用,並且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設定元素的值的時候,提供更快的存取速度;
fastutil也提供了64位的array、set和list,以及高效能快速的,以及實用的IO類,來處理二進位制和文字型別的檔案;
fastutil最新版本要求Java 7以及以上版本;
6.2.Spark中應用fastutil的場景:
1、如果運算元函式使用了外部變數;那麼第一,你可以使用Broadcast廣播變數優化;第二,可以使用Kryo序列化類庫,提升序列化效能和效率;第三,如果外部變數是某種比較大的集合,那麼可以考慮使用fastutil改寫外部變數,首先從源頭上就減少記憶體的佔用,通過廣播變數進一步減少記憶體佔用,再通過Kryo序列化類庫進一步減少記憶體佔用。
2、在你的運算元函式裡,也就是task要執行的計算邏輯裡面,如果有邏輯中,出現,要建立比較大的Map、List等集合,可能會佔用較大的記憶體空間,而且可能涉及到消耗效能的遍歷、存取等集合操作;那麼此時,可以考慮將這些集合型別使用fastutil類庫重寫,使用了fastutil集合類以後,就可以在一定程度上,減少task創建出來的集合型別的記憶體佔用。避免executor記憶體頻繁佔滿,頻繁喚起GC,導致效能下降。
6.3.fastutil的使用
第一步:在pom.xml中引用fastutil的包
<dependency>
<groupId>fastutil</groupId>
<artifactId>fastutil</artifactId>
<version>5.0.9</version>
</dependency>
速度比較慢,可能是從國外的網去拉取jar包,可能要等待5分鐘,甚至幾十分鐘,不等
List<Integer> => IntList
基本都是類似於IntList的格式,字首就是集合的元素型別;特殊的就是Map,Int2IntMap,代表了key-value對映的元素型別。除此之外,剛才也看到了,還支援object、reference。
7.程序本地化,程式碼和資料在同一個程序中
Spark在Driver上,對Application的每一個stage的task,進行分配之前,都會計算出每個task要計算的是哪個分片資料,RDD的某個partition;Spark的task分配演算法,優先,會希望每個task正好分配到它要計算的資料所在的節點,這樣的話,就不用在網路間傳輸資料;但是呢,通常來說,有時,事與願違,可能task沒有機會分配到它的資料所在的節點,為什麼呢,可能那個節點的計算資源和計算能力都滿了;所以呢,這種時候,通常來說,Spark會等待一段時間,預設情況下是3s鍾(不是絕對的,還有很多種情況,對不同的本地化級別,都會去等待),到最後,實在是等待不了了,就會選擇一個比較差的本地化級別,比如說,將task分配到靠它要計算的資料所在節點,比較近的一個節點,然後進行計算。
所以可以適當調節等待時間。
7.1.如何判斷要調節
觀察日誌,spark作業的執行日誌,推薦大家在測試的時候,先用client模式,在本地就直接可以看到比較全的日誌。日誌裡面會顯示,starting task。。。,PROCESS LOCAL、NODE LOCAL。觀察大部分task的資料本地化級別,如果大多都是PROCESS_LOCAL,那就不用調節了,如果是發現,好多的級別都是NODE_LOCAL、ANY,那麼最好就去調節一下資料本地化的等待時長,調節完,應該是要反覆調節,每次調節完以後,再來執行,觀察日誌,看看大部分的task的本地化級別有沒有提升;看看,整個spark作業的執行時間有沒有縮短
7.2.怎麼調節
spark.locality.wait,預設是3s;6s,10s
預設情況下,下面3個的等待時長,都是跟上面那個是一樣的,都是3s
spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack
new SparkConf() .set("spark.locality.wait", "10")
8.降低cache操作的記憶體佔比
spark中,堆記憶體又被劃分成了兩塊兒,一塊兒是專門用來給RDD的cache、persist操作進行RDD資料快取用的;另外一塊兒,就是我們剛才所說的,用來給spark運算元函式的執行使用的,存放函式中自己建立的物件。預設情況下,給RDD cache操作的記憶體佔比,是0.6,60%的記憶體都給了cache操作了。但是問題是,如果某些情況下,cache不是那麼的緊張,問題在於task運算元函式中建立的物件過多,然後記憶體又不太大,導致了頻繁的minor gc,甚至頻繁full gc,導致spark頻繁的停止工作。效能影響會很大。
在yarn去執行的話,那麼就通過yarn的介面,去檢視你的spark作業的執行統計,很簡單,大家一層一層點選進去就好。可以看到每個stage的執行情況,包括每個task的執行時間、gc時間等等。如果發現gc太頻繁,時間太長。此時就可以適當調價這個比例。
降低cache操作的記憶體佔比,大不了用persist操作,選擇將一部分快取的RDD資料寫入磁碟,或者序列化方式,配合Kryo序列化類,減少RDD快取的記憶體佔用;降低cache操作記憶體佔比;對應的,運算元函式的記憶體佔比就提升了。這個時候,可能,就可以減少minor gc的頻率,同時減少full gc的頻率。對效能的提升是有一定的幫助的。
一句話,讓task執行運算元函式時,有更多的記憶體可以使用。spark.storage.memoryFraction,0.6 -> 0.5 -> 0.4 -> 0.2
9.修改executor堆外記憶體
9.1. 有時候,如果你的spark作業處理的資料量特別特別大,幾億資料量;然後spark作業一執行,時不時的報錯,shuffle file cannot find,executor、task lost,out of memory(記憶體溢位);
可能是說executor的堆外記憶體不太夠用,導致executor在執行的過程中,可能會記憶體溢位;然後可能導致後續的stage的task在執行的時候,可能要從一些executor中去拉取shuffle map output檔案,但是executor可能已經掛掉了,關聯的block manager也沒有了;所以可能會報shuffle output file not found;resubmitting task;executor lost;spark作業徹底崩潰
上述情況下,就可以去考慮調節一下executor的堆外記憶體。也許就可以避免報錯;此外,有時,堆外記憶體調節的比較大的時候,對於效能來說,也會帶來一定的提升。
9.2.如何調節
--conf spark.yarn.executor.memoryOverhead=2048
spark-submit腳本里面,去用--conf的方式,去新增配置;一定要注意!!!切記,不是在你的spark作業程式碼中,用new SparkConf().set()這種方式去設定,不要這樣去設定,是沒有用的!一定要在spark-submit指令碼中去設定。
spark.yarn.executor.memoryOverhead(看名字,顧名思義,針對的是基於yarn的提交模式)
預設情況下,這個堆外記憶體上限大概是300多M;後來我們通常專案中,真正處理大資料的時候,這裡都會出現問題,導致spark作業反覆崩潰,無法執行;此時就會去調節這個引數,到至少1G(1024M),甚至說2G、4G 。通常這個引數調節上去以後,就會避免掉某些JVM OOM的異常問題,同時呢,會讓整體spark作業的效能,得到較大的提升。
9.3.遠端拉取資料建立網路異常
當上面的blockmanager本地沒有資料就會嘗試建立遠端的網路連線,並且去拉取資料,此時如果對應的blockmanager由於oom崩潰,就會連線沒有響應,無法建立網路連線;會卡住;ok,spark預設的網路連線的超時時長,是60s;如果卡住60s都無法建立連線的話,那麼就宣告失敗了。
碰到一種情況,偶爾,偶爾,偶爾!!!沒有規律!!!某某file。一串file id。uuid(dsfsfd-2342vs--sdf--sdfsd)。not found。file lost。
這種情況下,很有可能是有那份資料的executor在jvm gc。所以拉取資料的時候,建立不了連線。然後超過預設60s以後,直接宣告失敗。
報錯幾次,幾次都拉取不到資料的話,可能會導致spark作業的崩潰。也可能會導致DAGScheduler,反覆提交幾次stage。TaskScheduler,反覆提交幾次task。大大延長我們的spark作業的執行時間。
可以考慮調節連線的超時時長。
--conf spark.core.connection.ack.wait.timeout=300
spark-submit指令碼,切記,不是在new SparkConf().set()這種方式來設定的。