Spark:程式碼調優
- 程式碼調優
分配更多的資源
- 避免建立重複的RDD,複用同一個RDD
- 對多次使用的RDD進行持久化
如何選擇一種最合適的持久化策略?
預設情況下,效能最高的當然是MEMORY_ONLY,但前提是你的記憶體必須足夠足夠大,可以綽綽有餘地存放下整個RDD的所有資料。因為不進行序列化與反序列化操作,就避免了這部分的效能開銷;對這個RDD的後續運算元操作,都是基於純記憶體中的資料的操作,不需要從磁碟檔案中讀取資料,效能也很高;而且不需要複製一份資料副本,並遠端傳送到其他節點上。但是這裡必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中資料比較多時(比如幾十億),直接用這種持久化級別,會導致JVM的OOM記憶體溢位異常。
如果使用MEMORY_ONLY級別時發生了記憶體溢位,那麼建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD資料序列化後再儲存在記憶體中,此時每個partition僅僅是一個位元組陣列而已,大大減少了物件數量,並降低了記憶體佔用。這種級別比MEMORY_ONLY多出來的效能開銷,主要就是序列化與反序列化的開銷。但是後續運算元可以基於純記憶體進行操作,因此效能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的資料量過多的話,還是可能會導致OOM記憶體溢位的異常。
如果純記憶體的級別都無法使用,那麼建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的資料量很大,記憶體無法完全放下。序列化後的資料比較少,可以節省記憶體和磁碟的空間開銷。同時該策略會優先儘量嘗試將資料快取在記憶體中,記憶體快取不下才會寫入磁碟。
通常不建議使用DISK_ONLY和字尾為_2的級別:因為完全基於磁碟檔案進行資料的讀寫,會導致效能急劇降低,有時還不如重新計算一次所有RDD。字尾為_2的級別,必須將所有資料都複製一份副本,併發送到其他節點上,資料複製以及網路傳輸會導致較大的效能開銷,除非是要求作業的高可用性,否則不建議使用。
持久化運算元:
cache:
MEMORY_ONLY
persist:
MEMORY_ONLY
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
一般不要選擇帶有_2的持久化級別。
checkpoint:
- 如果一個RDD的計算時間比較長或者計算起來比較複雜,一般將這個RDD的計算結果儲存到HDFS上,這樣資料會更加安全。
- 如果一個RDD的依賴關係非常長,也會使用checkpoint,會切斷依賴關係,提高容錯的效率。
- 儘量避免使用shuffle類的運算元
使用廣播變數來模擬使用join,使用情況:一個RDD比較大,一個RDD比較小。
join運算元=廣播變數+filter、廣播變數+map、廣播變數+flatMap
- 使用map-side預聚合的shuffle操作
即儘量使用有combiner的shuffle類運算元。
combiner概念:
在map端,每一個map task計算完畢後進行的區域性聚合。
combiner好處:
- 降低shuffle write寫磁碟的資料量。
- 降低shuffle read拉取資料量的大小。
- 降低reduce端聚合的次數。
有combiner的shuffle類運算元:
- reduceByKey:這個運算元在map端是有combiner的,在一些場景中可以使用reduceByKey代替groupByKey。
- aggregateByKey
- combineByKey
- 儘量使用高效能的運算元
使用reduceByKey替代groupByKey
使用mapPartition替代map
使用foreachPartition替代foreach
filter後使用coalesce減少分割槽數
使用使用repartitionAndSortWithinPartitions替代repartition與sort類操作
使用repartition和coalesce運算元操作分割槽。
- 使用廣播變數
開發過程中,會遇到需要在運算元函式中使用外部變數的場景(尤其是大變數,比如100M以上的大集合),那麼此時就應該使用Spark的廣播(Broadcast)功能來提升效能,函式中使用到外部變數時,預設情況下,Spark會將該變數複製多個副本,通過網路傳輸到task中,此時每個task都有一個變數副本。如果變數本身比較大的話(比如100M,甚至1G),那麼大量的變數副本在網路中傳輸的效能開銷,以及在各個節點的Executor中佔用過多記憶體導致的頻繁GC,都會極大地影響效能。如果使用的外部變數比較大,建議使用Spark的廣播功能,對該變數進行廣播。廣播後的變數,會保證每個Executor的記憶體中,只駐留一份變數副本,而Executor中的task執行時共享該Executor中的那份變數副本。這樣的話,可以大大減少變數副本的數量,從而減少網路傳輸的效能開銷,並減少對Executor記憶體的佔用開銷,降低GC的頻率。
廣播大變數傳送方式:Executor一開始並沒有廣播變數,而是task執行需要用到廣播變數,會找executor的blockManager要,bloackManager找Driver裡面的blockManagerMaster要。
使用廣播變數可以大大降低叢集中變數的副本數。不使用廣播變數,變數的副本數和task數一致。使用廣播變數變數的副本和Executor數一致。
- 使用Kryo優化序列化效能
在Spark中,主要有三個地方涉及到了序列化:
- 在運算元函式中使用到外部變數時,該變數會被序列化後進行網路傳輸。
- 將自定義的型別作為RDD的泛型型別時(比如JavaRDD<SXT>,SXT是自定義型別),所有自定義型別物件,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable介面。
- 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的位元組陣列。
Kryo序列化器介紹:
Spark支援使用Kryo序列化機制。Kryo序列化機制,比預設的Java序列化機制,速度要快,序列化後的資料要更小,大概是Java序列化機制的1/10。所以Kryo序列化優化以後,可以讓網路傳輸的資料變少;在叢集中耗費的記憶體資源大大減少。
對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的效能。Spark預設使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支援使用Kryo序列化庫,Kryo序列化類庫的效能比Java序列化類庫的效能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,效能高10倍左右。Spark之所以預設沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要註冊所有需要進行序列化的自定義型別,因此對於開發者來說,這種方式比較麻煩。
Spark中使用Kryo:
Sparkconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(new Class[]{SpeedSortKey.class}) |
- 優化資料結構
java中有三種類型比較消耗記憶體:
- 物件,每個Java物件都有物件頭、引用等額外的資訊,因此比較佔用記憶體空間。
- 字串,每個字串內部都有一個字元陣列以及長度等額外資訊。
- 集合型別,比如HashMap、LinkedList等,因為集合型別內部通常會使用一些內部類來封裝集合元素,比如Map.Entry。
因此Spark官方建議,在Spark編碼實現中,特別是對於運算元函式中的程式碼,儘量不要使用上述三種資料結構,儘量使用字串替代物件,使用原始型別(比如Int、Long)替代字串,使用陣列替代集合型別,這樣儘可能地減少記憶體佔用,從而降低GC頻率,提升效能。
- 使用高效能的庫fastutil
fasteutil介紹:
fastutil是擴充套件了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊型別的map、set、list和queue;fastutil能夠提供更小的記憶體佔用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set,好處在於,fastutil集合類,可以減小記憶體的佔用,並且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設定元素的值的時候,提供更快的存取速度。fastutil的每一種集合型別,都實現了對應的Java中的標準介面(比如fastutil的map,實現了Java的Map介面),因此可以直接放入已有系統的任何程式碼中。
fastutil最新版本要求Java 7以及以上版本。
使用:
見RandomExtractCars.java類