1. 程式人生 > 其它 >Spark Core 開發調優

Spark Core 開發調優

1、調優的意義

在大資料分析計算領域,Spark已經成為主流的,非常受歡迎的計算引擎之一。Spark的功能涵蓋了大資料領域的批處理、類SQL處理、實時計算、機器學習、圖計算等多種不同型別的計算操作,應用範圍廣泛、前景一片大好,今天許多公司作為主流計算引擎使用,大多數Spark使用者,最初都是想提高計算效能而選擇使用Spark作為計算引擎的,可見Spark足以使大資料作業執行速度更快,效能得到驚人的提高。然而,通過Spark開發出高效能的大資料作業並不是那麼簡單的事情,如果沒有對Spark進行合理的調優,Spark作業的執行速度很可能受到制約,這樣就完全體現不出Spark作為一種快速的大資料計算引擎的優勢。因此,想要用好Spark,就不需要其進行合理的調優。

Spark效能調優實際上由多部份組成,不是調節幾個引數就能立竿見影。我們需要根據具體的業務場景、資料規模以及叢集環境對Spark進行綜合性的分析,然後進行多個方面的調節,才能獲得綜合最佳效果。這裡我通過學習Spark的知識,以及在工作中的總結分析,總結出了一些關於Spark作業的優化方案。整套方案分為開發調優,資源調優,資料傾斜調優,shuffle調優幾個部分。併發和資源調優是所有Spark作業都要注意和遵循的一些基本元組,是高效能Spark作業的基礎;資料傾斜調優,主要講述了一套完整的用來解決如惡對Spark作業的shuule執行過程以及細節進行調優。

今天介紹下Spark效能優化基礎篇,主要講解開發調優和資源調優。

2、開發調優

Spark 效能優化的第一步,就是在開發Spark作業的時候注意和應用一些效能優化的基本元組。開發調優,就是讓大家瞭解Spark基本開發原則,包括 RDD  lineage設計,運算元合理使用,特殊操作優化等。在開發過程中,時刻應該注意以上原則,並將這些原則根據不同的業務應用場景,靈活的使用這些規則幫助改善Spark作業的執行效率。

2.1、原則1 避免建立重複的RDD

通常來講,我們在開發一個Spark作業的時候,首先是基於某個資料來源建立一個初始的RDD,接著對這個RDD進行某種運算元操作,然後得到下一個RDD,以此類推,迴圈往復,指導計算出最終我們想要的結果,在這個過程中,多個RDD會通過不同的運算元操作串起來,這個RDD串就是 RDD lineage,也就是平時說的RDD血緣關係。我們在開發過程中要注意,對同一份資料之建立一個RDD,不要對一份資料建立多個RDD。一些Spark新手在剛開始開發Spark作業時,可能會完了自己已經對謀一份資料建立了一個RDD,從而導致對於同一份RDD建立了多個RDD,這就意味著,我們的Spark作業進行多次重複的計算來建立多個代表相同資料的RDD,進而增加了作業的效能開銷。

例子

    /**
     * 需要對 名為 t.txt 的檔案進行一次 map 操作,在進行一次 reduce 操作,也就是說對一個檔案進行兩次操作,
     * 錯誤的做法;對同一份資料進行多次運算元操作,建立多個RDD
     * 這裡執行了 兩次 textFile 方法,針對同一個檔案,建立兩個RDD,然後分別執行了對應的操作
     * 這種請情況下,Spark需要從檔案系統載入兩次檔案,並建立兩個RDD,第二次載入檔案建立RDD的開銷實屬浪費
     */

    val rdd1: RDD[String] = sc.textFile("data/1.txt")
    rdd1.map(...)
    val rdd2: RDD[String] = sc.textFile("data/1.txt")
    rdd2.reduce(...)

    /**
     * 正確的做法:對於一份資料執行多次操作的時候,只建立一個RDD
     * 這種做法明顯比上一種好多了,因為我們對於同一份資料只建立了一個RDD,然後對這個RDD進行兩次運算元操作
     * 注意這裡到此優化並沒有結束,由rdd1被執行了兩次運算元操作,第二次執行reduce操作的時候,還是會再次從源頭重新計算一次rdd1 的資料,還是由重複載入資料的開銷的
     * 要想徹底解決這個問題,需要結合原則三,對多次使用的RDD進行快取,才能保證多次呼叫的RDD只計算一次
     */
    val rdd1: RDD[String] = sc.textFile("data/1.txt")
    rdd1.map(...)
    rdd1.reduce(...)

2.2、原則2 儘可能複用同一個RDD

除了避免在開發過程中對同一份資料建立多個RDD之外,在對不同的資料執行運算元操作時儘可能的複用一個RDD。比如說,有一個RDD的資料時key-value型別的,另一個數據時單 value 型別,這兩個RDD的 value資料完全一樣,那麼我們可以使用key-value型別的RDD,因為其中已經包含了另一個RDD的資料,對於類似的這種RDD的資料有重疊或者包含的情況,我們應該儘量複用一個RDD,這樣儘可能減少RDD的數量,從而減少運算元執行的次數,

    /**
     * 錯誤做法
     * rdd1 是一個<Long,String> 格式的RDD
     * 接著由於業務需要,對 rdd1 執行一次 map 操作,建立了一個 rdd2,而rdd2 中的資料僅是 rdd1的 value 而已,也就是說,rdd2 是 rdd1 的子集
     */

    val rdd1: RDD[(Long,String)] = ...
    val rdd2: RDD[String] = ...
    //對 rdd1 rdd2進行不同的操作
    rdd1.map(...)
    rdd2.filter(...)

    /**
     * 正確做法
     * rdd1 & rdd2 無非就是格式不同而已,rdd2完全是rdd1的子集,卻建立了rdd2,然後各自進行操作
     * 其實這種情況可以複用rdd1,我們可以使用 rdd1 進行一次操作,在執行第二次操作的時候直接使用 rdd1 的 value 部分即可
     * */
    rdd1.map(...)
    rdd1.filter(_.2...)

2.3、原則3 對多次使用的RDD進行持久化

當在Spark程式碼中對同一個RDD進行多次運算元操作時,那麼你已經學會了優化原則的第一步了,也就是說複用RDD,此時就在此基礎上進行二次優化,也就是說要保證對一個RDD進行多次操作時,這個RDD本身僅計算一次。Spark對於一個RDD執行多次運算元操作的原理是這樣的;每次對一個RDD進行一次運算元操作時,都會重新從源頭處計算一遍那個RDD來,然後在對這個RDD進行運算元操作,這種方式很影響作業效率。因此對於這種情況,我建議:對於多次使用的RDD進行持久化。此時Spark會根據你的持久化策略,將RDD的資料儲存到記憶體或磁碟中,以後每次對RDD進行操作時,會從持久化的資料中讀取RDD資料,。然後執行運算元,而不是衝源頭重新計算一遍RDD,再執行運算元。

 /**
     * 如果要對RDD進行持久化,只要呼叫對這個RDD呼叫 cache() Huo persist() 方法
     * 正確做法
     * cache() 使用非序列化方式將RDD持久化到記憶體中,,此時再對RDD進行兩次運算元操作時,只有在第一次執行map() 運算元時,才會對rdd 從源頭處計算一次
     * 第二次執行運算元時,就會直接從記憶體中提取資料進行計算,不會重複計算一個rdd
     */
    val rdd1: RDD[String] = sc.textFile("data/1.txt").cache()
    rdd1.map(...)
    rdd1.reduce(...)

    /**
     * persist() 方法表示 手動選擇持久化級別,並使用指定的方式進行持久化
     * 比如說,StorageLevel.MEMORY_AND_DISK_SER表示,記憶體充足時優先持久化到記憶體中,記憶體不充足時持久化到磁碟檔案中。
     *  而且其中的_SER字尾表示,使用序列化的方式來儲存RDD資料,此時RDD中的每個partition都會序列化成一個大的位元組陣列,然後再持久化到記憶體或磁碟中。
     * 序列化的方式可以減少持久化的資料對記憶體/磁碟的佔用量,進而避免記憶體被持久化資料佔用過多,從而發生頻繁GC。
     */
    val rdd2: RDD[String] = sc.textFile("data/1.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
    rdd2.map(...)
    rdd2.reduce(...)

 對於persist()方法而言,可以根據不同的業務場景選擇不同的持久化級別。

Spark 持久化級別

 持久化級別怎麼選擇;

1、預設情況下,效能最高的當然時 MEMORY_ONLY,但是前提記憶體必須足夠大,可以綽綽有餘存放下整個RDD的所有資料。因為不進行序列化與反序列化操作,就避免了這部分效能開銷,對這個RDD的後續運算元操作,都是基於記憶體中的資料操作的,不需要從磁碟檔案中讀取資料,效能也很高 ,而且不需要複製一份副本,並遠端傳送到其它節點,當時必須要注意,在實際生產環境,恐怕能夠直接使用這種測類的場景還是非常有限的,如果RDD的資料比較大,使用這種方式進行持久化,會導致OOM。

2、如果使用MEMORY_ONLY級別時發生了記憶體溢位,那麼建議使用MEMORY_ONLY_SER,這種級別會將RDD資料序列化後在進行持久化到記憶體中,此時每個分割槽僅僅是一個位元組陣列,大大減少了物件數量,並降低了記憶體佔用,這種級別比MEMORY_ONOLY 多出來的記憶體開銷,主要就是序列化與反序列化的開銷,當時後運算元可以基於純記憶體進行操作,因此效能總體來說比較高,此外,可能發生的事情也是OOM。

3、如果村記憶體的級別無法使用,那麼建議使用MEMORY_AND_DISK_SRE ,而不是 MEMORY_AND_DISK策略,因為既然到了這一步,說明RDD資料非常大,記憶體放不下了,序列化後的資料比較少,可以節省記憶體*磁碟的空間開銷,同時該策略會優先嚐試將資料持久化到記憶體中,記憶體放不下才會持久化到磁碟。

3、通常不建議使用DISK_ONLY 和字尾為_2 的持久化級別,因為完全基於磁碟的資料讀寫,會導致效能急劇降低,有時候還不如重新計算一次所有的RDD。字尾為_2 的級別,必須將所有資料都複製一份,併發送到其它幾點進行備份,資料複製以及會導致網路IO的較大開銷,除非是要求作業的極高可靠性,否則不建議使用。

2.4、原則4  儘量避免使用shuffle類運算元

如果有可能的話,儘量避免使用shuffle類運算元,在Spark作業執行中,最消耗資源的就是shuffle 過程,shuffle 過程簡單來說就是將分佈在叢集中的多個節點上的同一個key,拉取到一個節點上,比如reduceBykey 和 join 都是會觸發shuffle 操作。shuffle 過程中,每個節點上的相同key都會先寫入本地磁碟檔案,然後其它節點需要通過網路傳輸拉取各個節點上的磁碟檔案中相同key ,而且相同key都拉取到相同的一個節點進行聚合曹祖時,還有可能因為key 分佈不均,導致某節點上處理的key較多,導致記憶體不不夠存放,進而溢寫到磁碟檔案中,因此shuffle過程中,可能會發生大量的磁碟檔案讀寫和IO操作,以及資料的網路IO ,磁碟和網路IO也是shuffle效能較差的主要原因。因此在開發Spark中,能避免使用 reduceBykey,join,distinct,repartition 等會進行shuffle的運算元,儘量使用map類非shuffle運算元,這樣的話,沒有shuffle或者只有少量的shuffle 運算元的spark作業,可以大大減少效能開銷。

 /**
     * 傳統 join 會導致 shuffle,因為兩個RDD中,相同的key都需要通過網路拉取到一個節點上,由一個task進行join 操作
      */
    val rdd3 = rdd1.join(rdd2)
     //此時使用 broadcast + map 不會導致shuffle
    var rdd2Data=rdd2.collect()
    val rdd2DataBroadcast  = sc.broadcast(rdd2Data)
     var rdd3= rdd1.map(rdd2DataBroadcast....)
    //注意使用廣播變數,建議僅僅在 rdd2 資料量較少的場景下使用,<2G ,因為每個 Executor 的記憶體。都會駐留一份 rdd2 的全量資料

2.5、原則5  使用map-side預聚合的shuffle操作

如果因為業務場景需要一定需要shuffle操作,無法使用map類運算元迭代,那麼儘量使用map-suide預聚合類運算元。map-side預聚合指的是每個節點本地相同key進行一次聚合操作,類似於MR的本地combiner,map-side預聚合之後,每個節點本地只有一條相同的key,因為多條相同key被聚合起來了,其它節點在拉取所有節點相同key的資料時,就會大大減少所需拉取的資料量,從而減少磁碟/網路 IO,通常來說,在可能的情況下,建議使用reducebykey/aggrebykey運算元來替代groupByKey,因為groupByKey運算元不會進行預聚合,全量資料會在叢集各個節點之間進行分發傳輸,效能相對較差。比如下,下面兩個圖,分別基於reducebykey和groupByKey 進行wordcount,其中第一個圖是 groupByKey的執行邏輯,可以看到,沒有進行仁和==任何本地預聚合,所有資料都在叢集之間傳輸,第二張圖是reducebykey,可以看到,每個節點本地進行相同key的預聚合操作,然後才傳輸到其它節點上進行全域性聚合。

 

2.6、原則6  使用高效能運算元

2.6.1、使用 reduceByKey/aggregateByKey 替代 groupByKey

詳見原則5

2.6.2、使用 mapPartition 替代 map

mapPartition類運算元,一次函式呼叫會處理整個partition內部所有資料,而不是像map那樣只處理一條資料,效能上相對高出一些,但是有的時候,使用mapPartition會出現OOM,因為單詞函式呼叫就處理整個分割槽的資料,如果記憶體不足,垃圾回收時無法回收太多的物件,很可能發生OOM異常,所以使用mapPartitio時要注意當前RDD資料大俠以及當前叢集記憶體情況。

2.6.3、使用 foreachPartition 替代 foreach

原理類似於 “使用mapPartiti 代替 map”,也是一次函式呼叫處理整個partition的所有資料,而不是一次函式呼叫處理一條資料。在工作中,foreachPartition 類的運算元,對效能提升很有幫助,比如在foreach 函式中,將RDD的所有資料寫入MYSQL,那麼如果是普通的foreach運算元,就會一條資料一條資料的寫入,每次函式呼叫可能會建立一個數據庫連結,此時勢必頻繁的建立和消費資料庫連結,效能是非常低的。但是使用foreachPartition運算元一次處理一個分割槽的資料,那麼對於每個partition,只要建立一個數據庫連結就行了,然後執行資料批量寫入,此時效能是比較高的。對於1W條左右資料量寫入MySQL ,使用foreachP和使用foreach,效能有30%的提升。

2.6.4、使用 filter 之後在進行 coalesce

通常對一個RDD執行filter運算元過濾掉RDD中較多的資料後,比如需要過濾掉三分之一的資料,建議使用coalesce運算元,手動減少RDD的partition數量,將RDD中資料壓縮到更少的partitio中,因為filter之後,RDD的每個partition中都會有很多資料被過濾掉,此時如果照常進行後續的計算。其實每個task處理的partition中的資料並不是很多,有一點資源浪費,而且此時處理的task越多,速度越慢,因此coalesce減少partition的數量,將RDD中的資料壓縮到更少的partition之後,只要使用更少的task即可處理完所有的partition,在某些場景下,對於效能提升是有幫助的。

2.6.5、使用 repartitionAndSortWithinPartition 代替 repartition與sort

repartitionAndWithinPartition是Spark官網推薦的一個運算元,官方建議,如果要在repartition重分割槽之後,還需要進行排序,建議直接使用repartitionAndSortWithinPartition運算元。因為該運算元可以一邊進行重分割槽shuffle操作,一邊進行進行排序,shuffle和sort操作同時進行。比先shuffle在sort來說,效能有很大提升。

2.7、合理使用廣播變數

在平時工作中,會遇到在運算元函式中用到外部變數的場景(尤其是大變數,100M~2G的集合或其他資料),那麼此時就應該使用Spark的廣播變數(Brocast)來提升效能。在運算元中使用外部變數時,預設情況下,Spark會將該變數複製多個副本,通過網路傳輸到task中,此時每個task都有一個變數副本,如果變數本身比較大的話,那麼大量的副本在網路中傳輸的效能開銷,以及在各個節點的Executor中佔有的記憶體導致頻繁GC,都會較大的制約Spark的整體效能。因此對於上述情況,如果使用外部變數比較大時,見識使用Spark的廣播變數,對變數進行廣播,廣播後的變數,會保證每個Executor都有一份變數副本,而Executor中的task執行共享該Executor的那份變數副本,這樣的話可以大大的減少變數副本,從而減少網路傳輸的效能開銷,減少對Executor的記憶體開銷,降低GC頻率,從而提升Spark作業執行效率。

     //以下程式碼中運算元函式中使用外部變數,沒有特殊操作,每個task都會有一份list的副本
    var list=...
    rdd1.map(list)
    /**
     * 以下程式碼中將list封裝成了 Broadcast 型別的廣播變數,在運算元函式中,使用廣播變數時,首先判斷當前task所在的Executor記憶體中是否有變數副本
     * 如果有就直接使用,如果沒有則從Driver或者其他Executor節點上遠端拉取一份放到本地Executor 內訓中,每個Ex而粗頭兒記憶體只保留一份廣播變數
     */
    var list1=...
    val broadCastList   = sc.broadcast(list1)
    rdd1.map(broadCastList)   

2.8、使用 Kryo 序列化改善序列化效能

在spark中,涉及序列化的地方主要有三個

  1. 在運算元函式中使用外部變數,該變數會序列化後進行網路傳輸。
  2. 將自定義的型別作為RDD的泛型型別時(比如JavaRDD,Student是自定義型別),多有自定義型別的物件,都會進行序列化,因此這種場景下也要求自定義類要實現Serializable介面。
  3. 使用可序列化的持久化策略時(MEMORY_AND_DISK_SER),Spark 會將RDD中的每個Partition都序列化成一個位元組陣列。

對於這三種序列化的地方,我們都可以使用Kryo序列化庫,來優化序列化和反序列化的效能。Spark 預設使用的是Java序列化機制,也就是ObjectOutputStream/ObjectInputStream API 進行序列化和反序列化操作,但是Spark同時支援使用Kryo序列化庫,Kryo序列化類庫的效能比Java的序列化類庫的效能高很多,官方介紹,Kryo序列化機制比Java序列化機制效能高出10倍左右,Spark之所有沒有使用Kryo作為預設序列化機制的原因是,kryo 要求最好能註冊所欲需要進行序列化的自定義型別,因此對於使用者來說有很大麻煩。以下是使用Kryo的程式碼示例,我們只要設定序列化類,在註冊系列化的自定義型別即可(比如運算元函式中使用到的外部變數型別,作為RDD泛型型別的自定義型別)。

  private val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  //設定序列化為KryoSerializer
  conf.set("spark.serializer","org.apach.spark.serializer.KryoSerializer")
  //註冊要序列化的自定義型別
 conf.registerKryoClasses(Array(classOf[MyStudent],classOf[Person]))

2.9、優化資料結構

java中,有三種類型比較消耗記憶體

  1. 物件:每個java物件都有物件頭,引用等額外資訊,因此比較佔用記憶體空間。
  2. 字串:每個字串都一個字元陣列和額外資訊。
  3. 集合型別:比如HashMap.LinkedList,因為集合型別內部通常會使用一些內部類來封裝集合元素,比如 Map.Entry

因此 Spark 官方建議,在Spark開發中,特別是對於運算元函式中的程式碼,儘量不要使用上述的資料結構,儘量使用字串代替物件,使用原始型別(Int,Long)代替字串,使用陣列代替集合物件,這樣儘可能減少記憶體佔用。從而降低GC頻率,提升Spark效率。但是我在工作發現,要做到該原則其實很難,因為我們需要考慮程式碼的可維護性,如果一個程式碼中,完全沒有任何物件抽象,全是字串拼接的方式,那麼對於後續的程式碼維護,無疑是宜昌艱難的事情。同理所有操作都是陣列實現,而不使用HashMap或者LinkedList等集合型別,那麼對於我們的開發難度和程式碼維護也是一個極大的考研,因此我建議,在可能和適當的情況下,使用佔用記憶體較少的資料結構,但是前提是保證程式碼的可維護性。