1. 程式人生 > 其它 >spark程式碼優化

spark程式碼優化

1.避免建立重複的RDD,複用同一個RDD

val rdd1 = sc.textFile... val rdd2 = sc.txtFile.. val rdd3 = rdd2.map.. val rdd4 = rdd3.flatMap val rdd5 = rdd1.mapPartitions... val rdd6 = rdd5.map... val rdd7 = rdd4.filter.. var result = rdd7.count...

上述程式碼中,rdd1,rdd5,rdd6是沒有被使用到的無效程式碼

2.對多次使用的RDD進行持久化

如何選擇一種最合適的持久化策略?

預設情況下,效能最高的當然是MEMORY_ONLY,但前提是你的記憶體必須足夠足夠大,可以綽綽有餘地存放下整個RDD的所有資料。因為不進行序列化與反序列化操作,就避免了這部分的效能開銷;對這個RDD的後續運算元操作,都是基於純記憶體中的資料的操作,不需要從磁碟檔案中讀取資料,效能也很高;而且不需要複製一份資料副本,並遠端傳送到其他節點上。但是這裡必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中資料比較多時(比如幾十億),直接用這種持久化級別,會導致JVM的OOM記憶體溢位異常。

如果使用MEMORY_ONLY級別時發生了記憶體溢位,那麼建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD資料序列化後再儲存在記憶體中,此時每個partition僅僅是一個位元組陣列而已,大大減少了物件數量,並降低了記憶體佔用。這種級別比MEMORY_ONLY多出來的效能開銷,主要就是序列化與反序列化的開銷。但是後續運算元可以基於純記憶體進行操作,因此效能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的資料量過多的話,還是可能會導致OOM記憶體溢位的異常。

如果純記憶體的級別都無法使用,那麼建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的資料量很大,記憶體無法完全放下。序列化後的資料比較少,可以節省記憶體和磁碟的空間開銷。同時該策略會優先儘量嘗試將資料快取在記憶體中,記憶體快取不下才會寫入磁碟。

通常不建議使用DISK_ONLY和字尾為_2的級別:因為完全基於磁碟檔案進行資料的讀寫,會導致效能急劇降低,有時還不如重新計算一次所有RDD。字尾為_2的級別,必須將所有資料都複製一份副本,併發送到其他節點上,資料複製以及網路傳輸會導致較大的效能開銷,除非是要求作業的高可用性,否則不建議使用。

cache() --MEMORY_ONLY = persist() = persist(MEMORY_ONLY)

persist() --MEMORY_ONLY | MEMORY_ONLY_SER | MEMORY_AND_DISK | MEMORY_AND_DISK_SER

checkpoint:

① 如果一個RDD的計算時間比較長或者計算起來比較複雜,一般將這個RDD的計算結果儲存到HDFS上,這樣資料會更加安全。

② 如果一個RDD的依賴關係非常長,也會使用checkpoint,會切斷依賴關係,提高容錯的效率。

3.使用廣播變數

開發過程中,會遇到需要在運算元函式中使用外部變數的場景(尤其是大變數,比如100M以上的大集合),那麼此時就應該使用Spark的廣播(Broadcast)功能來提升效能,函式中使用到外部變數時,預設情況下,Spark會將該變數複製多個副本,通過網路傳輸到task中,此時每個task都有一個變數副本。如果變數本身比較大的話(比如100M,甚至1G),那麼大量的變數副本在網路中傳輸的效能開銷,以及在各個節點的Executor中佔用過多記憶體導致的頻繁GC,都會極大地影響效能。如果使用的外部變數比較大,建議使用Spark的廣播功能,對該變數進行廣播。廣播後的變數,會保證每個Executor的記憶體中,只駐留一份變數副本,而Executor中的task執行時共享該Executor中的那份變數副本。這樣的話,可以大大減少變數副本的數量,從而減少網路傳輸的效能開銷,並減少對Executor記憶體的佔用開銷,降低GC的頻率。

廣播大變數傳送方式:Executor一開始並沒有廣播變數,而是task執行需要用到廣播變數,會找executor的blockManager要,bloackManager找Driver裡面的blockManagerMaster要。

使用廣播變數可以大大降低叢集中變數的副本數。不使用廣播變數,變數的副本數和task數一致。使用廣播變數變數的副本和Executor數一致。

4.collect() 回收導致Driver 記憶體緊張,儘量少用

5.儘量避免使用shuffle類的運算元

使用廣播變數+filter 代替join,避免shuffle,從而優化spark

適用場景:

1.進行join中,至少有一個RDD的資料量比較少(比如幾百M或者1~2個G)

2.每個Executor記憶體中都會駐留一份廣播變數的全量資料

join運算元=廣播變數+filter、廣播變數+map、廣播變數+flatMap

程式碼示例:

建立RDD:

val list1 = List((jame,23), (wade,3), (kobe,24)) val list2 = List((jame,cave), (wade,bulls), (kobe,lakers)) val rdd1 = sc.makeRDD(list1) val rdd2 = sc.makeRDD(list2)

傳統join

// 傳統的join操作會導致shuffle操作。 // 因為兩個RDD中,相同的key都需要通過網路拉取到一個節點上,由一個task進行join操作。 val rdd3 = rdd1.join(rdd2) // 結果如下 scala> rdd1.join(rdd2).collect res27: Array[(String, (Int, String))] = Array((kobe,(24,lakers)), (wade,(3,bulls)), (jame,(23,cave)))

使用Broadcast+map的join操作

// Broadcast+map的join操作,不會導致shuffle操作。 // 使用Broadcast將一個數據量較小的RDD作為廣播變數 val rdd2Data = rdd2.collect() val rdd2Bc = sc.broadcast(rdd2Data) // 在rdd1.map運算元中,可以從rdd2DataBroadcast中,獲取rdd2的所有資料。 // 然後進行遍歷,如果發現rdd2中某條資料的key與rdd1的當前資料的key是相同的,那麼就判定可以進行join。 def function(tuple: (String,Int)): (String,(Int,String)) ={ for(value <- rdd2Bc.value){ if(value._1.equals(tuple._1)){ return (tuple._1,(tuple._2,value._2.toString)) } (tuple._1,(tuple._2,null)) } // 在rdd1.map運算元中,可以從rdd2DataBroadcast中,獲取rdd2的所有資料。 // 然後進行遍歷,如果發現rdd2中某條資料的key與rdd1的當前資料的key是相同的,那麼就判定可以進行join。 // 此時就可以根據自己需要的方式,將rdd1當前資料與rdd2中可以連線的資料,拼接在一起(String或Tuple)。 val rdd3 = rdd1.map(function(_)) //結果如下,達到了與傳統join相同的效果 scala> rdd1.map(function(_)).collect res31: Array[(String, (Int, String))] = Array((jame,(23,cave)), (wade,(3,bulls)), (kobe,(24,lakers)))

6.使用map端有預聚合的shuffle操作

儘量使用有combiner的shuffle類運算元

combiner概念:在map端,每一個map task計算完畢後,進行區域性的聚合。

map端預聚合好處:

1.減少shuffle write寫磁碟的資料量

2.減少shuffle read拉取的資料量的大小

3.減少reduce 端聚合次數

有combiner的shuffle類運算元:

reduceByKey:這個運算元在map端是有combiner的,在一些場景中可以使用reduceByKey代替groupByKey運算元。

combineByKey

AggregateByKey

7.使用高效能的運算元:

使用reduceByKey替代groupByKey

使用mapPartition替代map

使用foreachPartition替代foreach

對RDD使用filter進行大量資料過濾之後使用coalesce減少分割槽數

使用repartitionAndSortWithinPartitions替代repartition與sort類操作

使用repartition和coalesce運算元操作分割槽。

8.優化資料結構

java中有三種類型比較消耗記憶體:

1) 物件,每個Java物件都有物件頭、引用等額外的資訊,因此比較佔用記憶體空間。

2) 字串,每個字串內部都有一個字元陣列以及長度等額外資訊。

3) 集合型別,比如HashMap、LinkedList等,因為集合型別內部通常會使用一些內部類來封裝集合元素,比如Map.Entry。

因此Spark官方建議,在Spark編碼實現中,特別是對於運算元函式中的程式碼,儘量不要使用上述三種資料結構,儘量使用字串替代物件,使用原始型別(比如Int、Long)替代字串,使用陣列替代集合型別,這樣儘可能地減少記憶體佔用,從而降低GC頻率,提升效能。

9.使用kryo序列化

在Spark中,主要有三個地方涉及到了序列化:

1) 在運算元函式中使用到外部變數時,該變數會被序列化後進行網路傳輸。(Driver 端定義物件在Executor使用)

2) 將自定義的型別作為RDD的泛型型別時(比如JavaRDD<SXT>,SXT是自定義型別),所有自定義型別物件,都會進行序列化。因此這種情況下,也要求自定義的類必須實現Serializable介面。

3) 使用可序列化的持久化策略時(比如MEMORY_ONLY_SER),Spark會將RDD中的每個partition都序列化成一個大的位元組陣列。

4)task序列化

Kryo序列化器介紹:

Spark支援使用Kryo序列化機制。Kryo序列化機制,比預設的Java序列化機制,速度要快,序列化後的資料要更小,大概是Java序列化機制的1/10。所以Kryo序列化優化以後,可以讓網路傳輸的資料變少;在叢集中耗費的記憶體資源大大減少。

對於這三種出現序列化的地方,我們都可以通過使用Kryo序列化類庫,來優化序列化和反序列化的效能。Spark預設使用的是Java的序列化機制,也就是ObjectOutputStream/ObjectInputStream API來進行序列化和反序列化。但是Spark同時支援使用Kryo序列化庫,Kryo序列化類庫的效能比Java序列化類庫的效能要高很多。官方介紹,Kryo序列化機制比Java序列化機制,效能高10倍左右。Spark之所以預設沒有使用Kryo作為序列化類庫,是因為Kryo要求最好要註冊所有需要進行序列化的自定義型別,因此對於開發者來說,這種方式比較麻煩。

使用:

Sparkconf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").registerKryoClasses(new Class[]{SpeedSortKey.class})

10.使用高效能的庫fastutil

fasteutil介紹:

fastutil是擴充套件了Java標準集合框架(Map、List、Set;HashMap、ArrayList、HashSet)的類庫,提供了特殊型別的map、set、list和queue;fastutil能夠提供更小的記憶體佔用,更快的存取速度;我們使用fastutil提供的集合類,來替代自己平時使用的JDK的原生的Map、List、Set,好處在於,fastutil集合類,可以減小記憶體的佔用,並且在進行集合的遍歷、根據索引(或者key)獲取元素的值和設定元素的值的時候,提供更快的存取速度。fastutil的每一種集合型別,都實現了對應的Java中的標準介面(比如fastutil的map,實現了Java的Map介面),因此可以直接放入已有系統的任何程式碼中。

fastutil最新版本要求Java 7以及以上版本。