編寫Spark程式的幾個優化點
雖然spark已經提供了大量簡單易用的API,但要想編寫出高效能的spark應用,必須要對整體框架有一定的瞭解,對於Spark初學者來說是比較困難的。
針對這個這個問題,其實在spark1.6中,已經加入了dataset,官方已經對其進行了一系列的優化,使用者可以將rdd轉化為dataset操作,減少學習成本。不過目前(1.6版本)依舊存在一些bug。
下文講解了使用RDD程式設計時,常用的幾種程式碼優化方法。
1. repartition和coalesce
這兩個方法都可以用在對資料的重新分割槽中,其中repartition
是一個代價很大的操作,它會將所有的資料進行一次shuffle,然後重新分割槽。
如果你僅僅只是想減少分割槽數,從而達到減少碎片任務或者碎片資料的目的。使用coalesce
就可以實現,該操作預設不會進行shuffle。其實repartition
只是coalesce
的shuffle版本。
一般我們會在filter
運算元過濾了大量資料後使用它。比如將 partition 數從1000減少到100。這可以減少碎片任務,降低啟動task的開銷。
note1: 如果想檢視當前rdd的分割槽數,在java/scala中可以使用rdd.partitions.size()
,在python中使用rdd.getNumPartitions()
。
note2: 如果要增加分割槽數,只能使用repartition,或者把partition縮減為一個非常小的值,比如說“1”,也建議使用repartition。
2. mapPartitions和foreachPartitions
適當使用mapPartitions
和foreachPartitions
代替map
和foreach
可以提高程式執行速度。這類操作一次會處理一個partition中的所有資料,而不是一條資料。
mapPartition - 因為每次操作是針對partition的,那麼操作中的很多物件和變數都將可以複用,比如說在方法中使用廣播變數等。
foreachPartition - 在和外部資料庫互動操作時使用,比如 redis , mysql 等。通過該方法可以避免頻繁的建立和銷燬連結,每個partition使用一個數據庫連結,對效率的提升還是非常明顯的。
note: 此類方法也存在缺陷,因為一次處理一個partition中的所有資料,在記憶體不足的時候,將會遇到OOM的問題。
3.reduceByKey和aggregateByKey
使用reduceByKey
/aggregateByKey
代替groupByKey
。
reduceByKey
/aggregateByKey
會先在map端對本地資料按照使用者定義的規則進行一次聚合,之後再將計算的結果進行shuffle,而groupByKey
則會將所以的計算放在reduce階段進行(全量資料在各個節點中進行了分發和傳輸)。所以前者的操作大量的減少shuffle的資料,減少了網路IO,提高執行效率。
4. mapValues
針對k,v結構的rdd,mapValues
直接對value進行操作,不對Key造成影響,可以減少不必要的分割槽操作。
5. broadcast
Spark中廣播變數有幾個常見的用法。
實現map-side join
在需要join操作時,將較小的那份資料轉化為普通的集合(陣列)進行廣播,然後在大資料集中使用小資料進行相應的查詢操作,就可以實現map-side join的功能,避免了join操作的shuffle過程。在我之前的文章中對此用法有詳細說明和過程圖解。
使用較大的外部變數
如果存在較大的外部變數(外部變數可以理解為在driver中定義的變數),比如說字典資料等。在運算過程中,會將這個變數複製出多個副本,傳輸到每個task之中進行執行。如果這個變數的大小有100M或者更大,將會浪費大量的網路IO,同時,executor也會因此被佔用大量的記憶體,造成頻繁GC,甚至引發OOM。
因此在這種情況下,我最好提前對該變數進行廣播,它會被事先將副本分發到每個executor中,同一executor中的task則在執行時共享該變數。很大程度的減少了網路IO開銷以及executor的記憶體使用。
6. 複用RDD
避免建立一些用處不大的中間RDD(比如從父RDD抽取了某幾個欄位形成新的RDD),這樣可以減少一些運算元操作。
對多次使用的RDD進行快取操作,減少重複計算,在下文有說明。
7. cache和persist
cache
方法等價於persist(StorageLevel.MEMORY_ONLY)
不要濫用快取操作。快取操作非常消耗記憶體,快取前考慮好是否還可以對一些無關資料進行過濾。如果你的資料在接下來的操作中只使用一次,則不要進行快取。
如果需要複用RDD,則可以考慮使用快取操作,將大幅度提高執行效率。快取也分幾個級別。
MEMORY_ONLY
如果快取的資料量不大或是記憶體足夠,可以使用這種方式,該策略效率是最高的。但是如果記憶體不夠,之前快取的資料則會被清出記憶體。在spark1.6中,則會直接提示OOM。
MEMORY_AND_DISK
優先將資料寫入記憶體,如果記憶體不夠則寫入硬碟。較為穩妥的策略,但是如果不是很複雜的計算,可能重算的速度比從磁碟中讀取還要快。
MEMORY_ONLY_SER
會將RDD中的資料序列化後存入記憶體,佔用更小的記憶體空間,減少GC頻率,當然,取出資料時需要反序列化,同樣會消耗資源。
MEMORY_AND_DISK_SER
不再贅述。
DISK_ONLY
該策略類似於checkPoint方法,把所有的資料存入了硬碟,再使用的時候從中讀出。適用於資料量很大,重算代價也非常高的操作。
各種
_2
結尾的儲存策略實際上是對快取的資料做了一個備份,代價非常高,一般不建議使用。
結語
spark的優化方法還有很多,這篇文章主要從使用的角度講解了常用的優化方法,具體的使用方法可以參考博主的其他優化文章。