Spark優化(四):儘量避免使用shuffle類運算元
儘量避免使用shuffle類運算元
如果有可能的話,要儘量避免使用shuffle類運算元。因為Spark作業執行過程中,最消耗效能的地方就是shuffle過程。shuffle過程,簡單來說,就是將分佈在叢集中多個節點上的同一個key拉取到同一個節點上,進行聚合或join等操作。比如reduceByKey、join等運算元,都會觸發shuffle操作。
shuffle過程中,各個節點上的相同key都會先寫入本地磁碟檔案中,然後其它節點需要通過網路傳輸拉取各個節點上的磁碟檔案中的相同key。而且相同key都拉取到同一個節點進行聚合操作時,還有可能會因為一個節點上處理的key過多,導致記憶體不夠存放,進而溢寫到磁碟檔案中。因此在shuffle過程中,可能會發生大量的磁碟檔案讀寫的IO操作,以及資料的網路傳輸操作。磁碟IO和網路資料傳輸也是shuffle效能較差的主要原因。
所以在我們的開發過程中,能避免則儘可能避免使用reduceByKey、join、distinct、repartition等會進行shuffle的運算元,儘量使用map類的非shuffle運算元。這樣的話,沒有shuffle操作或者僅有較少shuffle操作的Spark作業,可以大大減少效能開銷。
Broadcast與map進行join程式碼示例
// 傳統的join操作會導致shuffle操作。
// 因為兩個RDD中,相同的key都需要通過網路拉取到一個節點上,由一個task進行join操作。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操作,不會導致shuffle操作。
// 使用Broadcast將一個數據量較小的RDD作為廣播變數。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map運算元中,可以從rdd2DataBroadcast中,獲取rdd2的所有資料。
// 然後進行遍歷,如果發現rdd2中某條資料的key與rdd1的當前資料的key是相同的,那麼就判定可以進行join。
// 此時就可以根據自己需要的方式,將rdd1當前資料與rdd2中可以連線的資料,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意,以上操作,建議僅僅在rdd2的資料量比較少(比如幾百M,或者一兩G)的情況下使用。
// 因為每個Executor的記憶體中,都會駐留一份rdd2的全量資料。