1. 程式人生 > 實用技巧 >spark相關優化

spark相關優化

標題 spark開發調優

1.高效能序列化類庫
在Spark中,預設是使用Java自帶的序列化機制——基於ObjectInputStream和ObjectOutputStream的序列化機制,這是為了提高便捷性和適用性,畢竟是Java原生的嘛。然鵝,自帶的東西往往考慮的東西比較多,沒法做到樣樣俱全,比如內序列化後佔據的記憶體還是較大,但是Spark是基於記憶體的大資料框架,對記憶體的要求很高。所以,在Spark應用程式中,Java自帶的序列化庫的效率有點差強人意。需求是從實際出發的嘛,最終Spark也提供了另外一種序列化機制——Kryo序列化機制。

Kryo序列化機制比Java序列化機制更快,序列化後的資料佔的記憶體更小。那麼Kryo序列化機制這麼好,為什麼不選用它是預設序列化庫呢?這裡提一句話“人無完人,誰能無錯”,Kryo序列化機制也樣,之所以不選用它為預設序列化機制是因為有些型別雖然實現了Seriralizable介面,但是不一定能夠進行序列化;此外,如果要得到最佳的效能,需要在Spark應用程式中,對所有 需要序列化的型別都進行註冊。

使用Kryo序列化機制的方法: 1.給SparkConf加入一個引數 SparkConf().set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)

2.對需要序列化的類自行進行註冊(因為如果不註冊,Kryo必須一直儲存型別的全限定名,會佔用記憶體。Spark預設是對Scala中常用的型別自動註冊了Kryo的,都在AllScalaRegistry類中) Scala版本:

val conf = new SparkConf().setMaster(…).setAppName(…)
conf.registerKryoClasses(Array(classOf[Counter] ))

val sc = new SparkContext(conf)
Java版本:

SparkConf conf = new SparkConf().setMaster(…).setAppName(…)
conf.registerKryoClasses(Counter.class)
JavaSparkContext sc = new JavaSparkContext(conf)
還可以對Kryo序列化機制進行優化達到更優的效果。

1、優化快取大小。如果註冊的要序列化的自定義的型別,本身很大大,比如包含了超過100個field。會導致要序列化的物件過大。此時需要對Kryo本身進行優化。因為Kryo內部的快取可能不夠存放這麼大的class物件。此時需要呼叫SparkConf.set()方法,設定spark.kryoserializer.buffer.mb引數的值,將其調大以適用。預設情況下spark.kryoserializer.buffer.mb是2,即最大能快取2M的物件,然後進行序列化。可以在必要時將其調大。比如設定為10。

2、預先註冊自定義型別。雖然不註冊自定義型別,Kryo類庫也能正常工作,但是那樣對於它要序列化的每個物件,都會儲存一份它的全限定類名。反而會耗費大量記憶體。因此通常都預先註冊好要序列化的自定義的類。

總結,需要用到Kryo序列化機制的場景,運算元內部使用了外部的大物件或者大資料結構。那麼可以切換到Kryo序列化,序列化速度更快,和獲得更小的序列化資料,減少記憶體的消耗。
2.優化資料結構
對資料結構的優化,主要是針對Java資料結構(如果用scala開發的話,其實原理也一樣的)。其實就是運算元裡面的區域性變數或者運算元函式外部的資料結構。比如基於鏈式結構的資料結構、包裝型別的資料結構等,它們在除了本身的資料之外,還會有額外的資料資訊來維持它們的資料型別,這樣就會比預想佔有更大的記憶體。

以下是一些優化建議:

1、能使用陣列或字串就不要用集合類。即優先使用Array,退而求次才是ArrayList、LinkedList、HashMap、HashTable等。熟悉Java語言的都知道集合類一般是泛型的,然鵝泛型的型別是包裝類,比如List list = new ArrayList(),就會因為包裝類而佔有額外的記憶體,最後佔有更多的額外開銷。在生產開發中的做法是,對於HashMap、List這種資料,統一用String拼接成特殊格式的字串。如Map<Integer, Person> persons = new HashMap<Integer, Person>()。可以優化為,特殊的字串格式:id:name,address|id:name,address…

2、避免使用多層巢狀的物件結構。public class Teacher { private List students = new ArrayList() }。就是非常不好的例子。因為Teacher類的內部又嵌套了大量的小Student物件。比如說,對於上述例子,也完全可以使用特殊的字串來進行資料的儲存。比如,用json字串來儲存資料,就是一個很好的選擇。{“teacherId”: 1, “teacherName”: “leo”, students:[{“studentId”: 1, “studentName”: “tom”},{“studentId”:2, “studentName”:“marry”}]}

3、能用int就不用String。雖然String比集合咧更高效,但是之前說過Java的String是佔2個位元組的,使用int會優化記憶體。

總結,在寫Spark程式的時候,要牢牢記住,儘量壓榨因語言帶來的記憶體開銷,達到節約記憶體的目的。

4.對多次使用的RDD進行持久化或Checkpoint

1、對一個RDD,基於它進行了多次transformation或者action操作。非常有必要對其進行持久化操作,以避免對一個RDD反覆進行計算。

2、如果要保證在RDD的持久化資料可能丟失的情況下,還要保證高效能,那麼可以對RDD進行Checkpoint操作。

5.使用序列化的持久化級別

RDD的資料是持久化到記憶體,或者磁碟中的。但是如果機器的記憶體大小不是很充足,或者有時為了節省機器的記憶體開銷,比如在生產環境下,機器不單單是跑這麼一個Spark應用的,還需要留些記憶體供其他應用使用。這種情況下,可以使用序列化的持久化級別。比如MEMORYONLYSER、MEMORYANDDISKSER等。用法是:RDD.persist(StorageLevel.MEMORYONLY_SER)。

將資料序列化之後,再持久化,可以大大減小對記憶體的消耗。此外,資料量小了之後,如果要寫入磁碟,磁碟io效能消耗也比較小。

對RDD持久化序列化後,RDD的每個partition的資料,都是序列化為一個巨大的位元組陣列。這樣,對於記憶體的消耗就小了。但是唯一的缺點是獲取RDD資料時,需要對其進行反序列化,會增大其效能開銷。這種情況下可以使用第二點的Kryo序列化機制配合,提高序列化的效率。
在這裡插入圖片描述
6.提高並行度
在實際使用Spark叢集的時候,很多時候對於叢集的資源並不是一定會被充分利用到,這是由於task和cpu核的協調不好導致的。要想合理的“榨乾”叢集的資源和效能,可以合理的設定Spark應用程式執行的並行度,來充分地利用叢集的資源,這樣才能充分的提高Spark應用程式的效能。

Spark的資料來源有兩種,一種是外部的,比如HDFS等分散式檔案系統,或者通過現有的陣列等資料結構序列化而成;一種是通過已有的RDD轉換而來的。這裡以Spark讀取HDFS的資料為例子。Spark會根據讀取HDFS的時候把每個block劃分為一個Partition,其實也是按照這個來自動設定並行度的。對於reduceByKey等會發生shuffle的運算元操作,會使用並行度最大的父RDD的並行度作為Spark應用的並行度。

通過上面的分析,我們可以手動設定並行度,在讀取HDFS或者並行化資料的時候呼叫textFile()和parallelize()等方法的時候,通過第二個引數來設定並行度。也可以使用spark.default.parallelism引數,來設定統一的並行度。根據Spark官方的推薦,最優的方案是給叢集中的每個cpu core設定23個task,也就是task的數量是cpu核的23倍。

以下是實現例子:現在已知cpu core有10個。比如spark-submit設定了executor數量是2個,每個executor有5個core。但是在Spark應用程式中這樣設定了SparkConf().set(“spark.default.parallelism”, “5”),那麼application總共會有5個core。實際上所有的RDD都被設為了partition為5,也就是每個RDD的資料分為5份,也就是5份資料(partition)成為5個task分配到這兩個executor中。很明顯,Spark應用程式在執行的時候,只佔用了5個cpu core,還剩下5個cpu core是沒用到的,浪費了叢集資源。此時可以設定這樣來優化Spark的叢集效能,通過設定引數 SparkConf().set(“spark.default.parallelism”, “30”)來設定合理的並行度,從而充分利用資源
7.廣播共享資料
RDD實質是彈性分散式資料集,在每個節點中的每個task(一個節點可以有很多個task)操作的只是RDD的一部分資料,如果RDD運算元操作使用到了運算元函式外部的一份大資料的時候,實際上是Spark應用程式把資料檔案通過driver傳送給每一個節點的每一個task,很明顯,這樣會造成大量的網路IO操作,大量消耗節點上的記憶體。其實很容易想到,把一份大資料檔案傳送給每個節點就OK了,單個節點的所有task共享一份資料,這樣就會節省大量的網路IO操作和節省大量記憶體消耗。

如果運算元函式中,使用到了特別大的資料(比如一份大的配置檔案)供每個節點的所有task使用,可以藉助Spark提供的共享變數。共享變數有兩種,一是廣播變數,一是累加器。廣播變數是隻讀的,通常用來提供一份資料給所有的節點,每個節點的task訪問訪問同一份資料。而累加器是可寫可讀的,一個累加器一般是用於所有節點對用一個簡單的整型變數進行共享累加,共同維護一份資料。這樣的話,就不至於將一個大資料拷貝到每一個task上去。而是給每個節點拷貝一份,然後節點上的task共享該資料。
8.資料本地化
Spark資料本地化的基本原理

Spark和MapReduce是如今兩個最流行的大資料框架,它們的原理都是計算移動,而資料不移動,計算找資料。這樣做的創新性是避免了大量資料的網路傳輸造成網路IO和記憶體的消耗。因此引出一個叫“資料本地化”的概念。

資料本地化對於Spark Job效能有著巨大的影響。如果資料以及要計算它的程式碼是在同一個節點,效能會非常高。但是,如果資料和計算它的程式碼是位於不同的節點,那麼其中之一必須到另外一方的機器上。通常來說,移動程式碼到其他節點,會比移動資料到程式碼所在的節點上去,速度要快得多,因為程式碼比較小。Spark也正是基於這個資料本地化的原則來構建task排程演算法的。

資料本地化,指的是,資料離計算它的程式碼有多近。基於資料距離程式碼的距離,有幾種資料本地化級別:

1、PROCESS_LOCAL:資料和計算它的程式碼在同一個JVM程序中。
2、NODE_LOCAL:資料和計算它的程式碼在一個節點上,但是不在一個程序中,比如在不同的executor程序中,或者是資料在HDFS檔案的block中。
3、NO_PREF:資料從哪裡過來,效能都是一樣的。
4、RACK_LOCAL:資料和計算它的程式碼在一個機架上。
5、ANY:資料可能在任意地方,比如其他網路環境內,或者其他機架上。
Spark資料本地化的特點

Spark傾向於使用最好的本地化級別來排程task,但並不是每次都會使用最好的本地化資料的。在實際中,如果沒有任何未處理的資料在空閒的executor上,Spark會放低本地化級別。這時有兩個選擇:第一,driver等待,直到executor上的cpu釋放出來,就分配task等資源給這個executor;第二,立即在任意一個executor上啟動一個task。

Spark會預設等待一段時間(這個事件可以通過引數來設定),來期望在task要處理的資料所在的節點上的executor空閒出一個cpu,從而為其分配task鞥資源。但只要超過了時間,Spark就會將task分配到其他任意一個空閒的executor上。

可以設定引數,spark.locality系列引數,來調節Spark等待task可以進行資料本地化的時間。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack

針對以上的分析,我們可以這樣調優,增大查詢本地化資料的超時時間和重試次數,因為時間更長更利於查詢本地化資料的節點的executor,重試次數越多,更多機會嘗試查詢本地化資料的節點的executor。

調優方式,主要是spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack這些引數,具體的根據實際的業務需求來控制引數就OK了。

9.reduceByKey和groupByKey的選擇

以下兩種方式是等價的,但是實現的原理卻不相同。reduceByKey,因為它會在map端,先進行本地combine,可以大大減少要傳輸到reduce端的資料量,減小網路傳輸的開銷。而groupByKey運算元卻不會這樣優化。所以只有在reduceByKey處理不了時,才用groupByKey().map()來替代。

val counts = pairs.reduceByKey(_ + _)
val counts = pairs.groupByKey().map(wordCounts => (wordCounts._1, wordCounts._2.sum))
10.shuffle效能優化

無論是MapReduce還是Spark,Shuffle階段是最重要的階段,它的好壞影響著整個Spark的效能。其實Shuffle階段的調優,可以從以下的引數入手:
new SparkConf().set(“spark.shuffle.consolidateFiles”, “true”)
spark.shuffle.consolidateFiles:是否開啟shuffle block file的合併,預設為false
spark.reducer.maxSizeInFlight:reduce task的拉取快取,預設48m
spark.shuffle.file.buffer:map task的寫磁碟快取,預設32k
spark.shuffle.io.maxRetries:拉取失敗的最大重試次數,預設3次
spark.shuffle.io.retryWait:拉取失敗的重試間隔,預設5s
spark.shuffle.memoryFraction:用於reduce端聚合的記憶體比例,預設0.2,超過比例就會溢位到磁碟上
這個是沒有開啟consolidateFiles優化(Spark1.3之後加入的),會產生大量的磁碟檔案,在寫磁碟和result task拉取資料的時候,會浪費過多的系統資源。

開啟consolidateFiles優化

優化方法:

開啟consolidateFiles,增大result task的拉取快取,增大shufflemaptask的寫磁碟快取,增大重試次數和重試間隔,調大用於reduce端聚合的記憶體比例
new SparkConf().set(“spark.shuffle.consolidateFiles”, “true”)
spark.shuffle.consolidateFiles:是否開啟shuffle block file的合併,預設為false
spark.reducer.maxSizeInFlight:ResultTask的拉取快取,預設48m
spark.shuffle.file.buffer:map task的寫磁碟快取,預設32k
spark.shuffle.io.maxRetries:拉取失敗的最大重試次數,預設3次
spark.shuffle.io.retryWait:拉取失敗的重試間隔,預設5s
spark.shuffle.memoryFraction:用於reduce端聚合的記憶體比例,預設0.2,超過比例就會溢位到磁碟上

影響一個Spark作業效能的因素,主要還是程式碼開發、資源引數以及資料傾斜,shuffle調優只能在整個Spark的效能調優中佔到一小部分。

shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩衝,每次都只能拉取與buffer緩衝(這個快取大小可以通過上面的引數來設定)相同大小的資料,然後通過記憶體中的一個Map進行聚合等操作。聚合完一批資料後,再拉取下一批資料,並放到buffer緩衝中進行聚合操作。一直迴圈,直到最後將所有資料到拉取完,並得到最終的結果。

開啟consolidate機制之後,在shuffle write過程中,task就不是為下游stage的每個task建立一個磁碟檔案了。此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁碟檔案,磁碟檔案的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就可以並行執行多少個task。而第一批並行執行的每個task都會建立一個shuffleFileGroup,並將資料寫入對應的磁碟檔案內。

當Executor的CPU core執行完一批task,接著執行下一批task時,下一批task就會複用之前已有的shuffleFileGroup,包括其中的磁碟檔案。也就是說,此時task會將資料寫入已有的磁碟檔案中,而不會寫入新的磁碟檔案中。因此,consolidate機制允許不同的task複用同一批磁碟檔案,這樣就可以有效將多個task的磁碟檔案進行一定程度上的合併,從而大幅度減少磁碟檔案的數量,進而提升shuffle write的效能。