1. 程式人生 > >spark 優化運算元選擇

spark 優化運算元選擇

摘要

   1.使用reduceByKey/aggregateByKey替代groupByKey

  2.使用mapPartitions替代普通map

  3.使用foreachPartitions替代foreach

  4.使用filter之後進行coalesce操作

  5.使用repartitionAndSortWithinPartitions替代repartition與sort類操作

  6.使用broadcast使各task共享同一Executor的集合替代運算元函式中各task傳送一份集合

  7.使用相同分割槽方式的join可以避免Shuffle

  8.map和flatMap選擇

  9.cache和persist選擇

  10.zipWithIndex和zipWithUniqueId選擇

  

內容

1.使用reduceByKey/aggregateByKey替代groupByKey

 reduceByKey/aggregateByKey底層使用combinerByKey實現,會在map端進行區域性聚合;groupByKey不會

2.使用mapPartitions替代普通map

mapPartitions類的運算元,一次函式呼叫會處理一個partition所有的資料,而不是一次函式呼叫處理一條,效能相對來說會高一些。但是有的時候,使用mapPartitions會出現OOM(記憶體溢位)的問題。因為單次函式呼叫就要處理掉一個partition所有的資料,如果記憶體不夠,垃圾回收時是無法回收掉太多物件的,很可能出現OOM異常。所以使用這類操作時要慎重!

3.使用foreachPartitions替代foreach

原理類似於“使用mapPartitions替代map”,也是一次函式呼叫處理一個partition的所有資料,而不是一次函式呼叫處理一條資料。在實踐中發現,foreachPartitions類的運算元,對效能的提升還是很有幫助的。比如在foreach函式中,將RDD中所有資料寫MySQL,那麼如果是普通的foreach運算元,就會一條資料一條資料地寫,每次函式呼叫可能就會建立一個數據庫連線,此時就勢必會頻繁地建立和銷燬資料庫連線,效能是非常低下;但是如果用foreachPartitions運算元一次性處理一個partition的資料,那麼對於每個partition,只要建立一個數據庫連線即可,然後執行批量插入操作,此時效能是比較高的。實踐中發現,對於1萬條左右的資料量寫MySQL,效能可以提升30%以上。

4.使用filter之後進行coalesce操作

通常對一個RDD執行filter運算元過濾掉RDD中較多資料後(比如30%以上的資料),建議使用coalesce運算元,手動減少RDD的partition數量,將RDD中的資料壓縮到更少的partition中去。因為filter之後,RDD的每個partition中都會有很多資料被過濾掉,此時如果照常進行後續的計算,其實每個task處理的partition中的資料量並不是很多,有一點資源浪費,而且此時處理的task越多,可能速度反而越慢。因此用coalesce減少partition數量,將RDD中的資料壓縮到更少的partition之後,只要使用更少的task即可處理完所有的partition。在某些場景下,對於效能的提升會有一定的幫助。

5.使用repartitionAndSortWithinPartitions替代repartition與sort類操作

repartitionAndSortWithinPartitions是Spark官網推薦的一個運算元,官方建議,如果需要在repartition重分割槽之後,還要進行排序,建議直接使用repartitionAndSortWithinPartitions運算元。因為該運算元可以一邊進行重分割槽的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,效能可能是要高的。

6.使用broadcast使各task共享同一Executor的集合替代運算元函式中各task傳送一份集合

在運算元函式中使用到外部變數時,預設情況下,Spark會將該變數複製多個副本,通過網路傳輸到task中,此時每個task都有一個變數副本。如果變數本身比較大的話(比如100M,甚至1G),那麼大量的變數副本在網路中傳輸的效能開銷,以及在各個節點的Executor中佔用過多記憶體導致的頻繁GC,都會極大地影響效能。

因此對於上述情況,如果使用的外部變數比較大,建議使用Spark的廣播功能,對該變數進行廣播。廣播後的變數,會保證每個Executor的記憶體中,只駐留一份變數副本,而Executor中的task執行時共享該Executor中的那份變數副本。這樣的話,可以大大減少變數副本的數量,從而減少網路傳輸的效能開銷,並減少對Executor記憶體的佔用開銷,降低GC的頻率。

7.使用相同分割槽方式的join可以避免Shuffle

Spark知道當前面的轉換已經根據相同的partitioner分割槽器分好區的時候如何避免shuffle。如果RDD有相同數目的分割槽,join操作不需要額外的shuffle操作。因為RDD是相同分割槽的,rdd1中任何一個分割槽的key集合都只能出現在rdd2中的單個分割槽中。因此rdd3中任何一個輸出分割槽的內容僅僅依賴rdd1和rdd2中的單個分割槽,第三次shuffle就沒有必要了。

1 2 3 rdd 1  =  someRdd.reduceByKey(...) rdd 2  =  someOtherRdd.reduceByKey(...) rdd 3  =  rdd 1 .join(rdd 2 )

那如果rdd1和rdd2使用不同的分割槽器,或者使用預設的hash分割槽器但配置不同的分割槽數呢?那樣的話,僅僅只有一個rdd(較少分割槽的RDD)需要重新shuffle後再join。(參考自

8.map和flatMap選擇

def map[U](f: (T) ⇒ U)(implicit arg0: ClassTag[U])RDD[U] //Return a new RDD by applying a function to all elements of this RDD.

def flatMap[U](f: (T) ⇒ TraversableOnce[U])(implicit arg0: ClassTag[U])RDD[U]  //Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
前者的輸入是一個單一資料,後者的輸入資料是一個可迭代的集合。同樣是執行某種對映函式,後者最終會把元素打平,即map的輸入輸出是一對一的,而flatMap的輸出是一對多的