Spark 持久化和共享變數
一、Spark RDD持久化
RDD持久化工作原理
Spark非常重要的一個功能特性就是可以將RDD持久化在記憶體中。當對RDD執行持久化操作時,每個節點都會將自己操作的RDD的partition持久化到記憶體中,並且在之後對該RDD的反覆使用中,直接使用記憶體快取的partition。這樣的話,對於針對一個RDD反覆執行多個操作的場景,就只要對RDD計算一次即可,後面直接使用該RDD,而不需要反覆計算多次該RDD。
巧妙使用RDD持久化,甚至在某些場景下,可以將spark應用程式的效能提升10倍。對於迭代式演算法和快速互動式應用來說,RDD持久化,是非常重要的。
要持久化一個RDD,只要呼叫其cache()或者persist()方法即可。在該RDD第一次被計算出來時,就會直接快取在每個節點中。而且Spark的持久化機制還是自動容錯的,如果持久化的RDD的任何partition丟失了,那麼Spark會自動通過其源RDD,使用transformation操作重新計算該partition。
cache()和persist()的區別在於,cache()是persist()的一種簡化方式,cache()的底層就是呼叫的persist()的無參版本,同時就是呼叫persist(MEMORY_ONLY),將資料持久化到記憶體中。如果需要從記憶體中去除快取,那麼可以使用unpersist()方法。
RDD持久化使用場景
1、第一次載入大量的資料到RDD中
2、頻繁的動態更新RDD Cache資料,不適合使用Spark Cache、Spark lineage
RDD持久化策略
持久化級別 | 說明 |
---|---|
MEMORY_ONLY | 將RDD以非序列化的Java物件儲存在JVM中。 如果沒有足夠的記憶體儲存RDD,則某些分割槽將不會被快取,每次需要時都會重新計算。 這是預設級別。 |
MEMORY_AND_DISK | 將RDD以非序列化的Java物件儲存在JVM中。如果資料在記憶體中放不下,則溢寫到磁碟上.需要時則會從磁碟上讀取 |
MEMORY_ONLY_SER (Java and Scala) | 將RDD以序列化的Java物件(每個分割槽一個位元組陣列)的方式儲存.這通常比非序列化物件(deserialized objects)更具空間效率,特別是在使用快速序列化的情況下,但是這種方式讀取資料會消耗更多的CPU。 |
MEMORY_AND_DISK_SER (Java and Scala) | 與MEMORY_ONLY_SER 類似,但如果資料在記憶體中放不下,則溢寫到磁碟上,而不是每次需要重新計算它們。 |
DISK_ONLY | 將RDD分割槽儲存在磁碟上。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2等 | 與上面的儲存級別相同,只不過將持久化資料存為兩份,備份每個分割槽儲存在兩個叢集節點上。 |
持久化策略的選擇
預設情況下,效能最高的當然是MEMORY_ONLY,但前提是你的記憶體必須足夠足夠大,可以綽綽有餘地存放下整個RDD的所有資料。因為不進行序列化與反序列化操作,就避免了這部分的效能開銷;對這個RDD的後續運算元操作,都是基於純記憶體中的資料的操作,不需要從磁碟檔案中讀取資料,效能也很高;而且不需要複製一份資料副本,並遠端傳送到其他節點上。但是這裡必須要注意的是,在實際的生產環境中,恐怕能夠直接用這種策略的場景還是有限的,如果RDD中資料比較多時(比如幾十億),直接用這種持久化級別,會導致JVM的OOM記憶體溢位異常。
如果使用MEMORY_ONLY級別時發生了記憶體溢位,那麼建議嘗試使用MEMORY_ONLY_SER級別。該級別會將RDD資料序列化後再儲存在記憶體中,此時每個partition僅僅是一個位元組陣列而已,大大減少了物件數量,並降低了記憶體佔用。這種級別比MEMORY_ONLY多出來的效能開銷,主要就是序列化與反序列化的開銷。但是後續運算元可以基於純記憶體進行操作,因此效能總體還是比較高的。此外,可能發生的問題同上,如果RDD中的資料量過多的話,還是可能會導致OOM記憶體溢位的異常。
如果純記憶體的級別都無法使用,那麼建議使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因為既然到了這一步,就說明RDD的資料量很大,記憶體無法完全放下。序列化後的資料比較少,可以節省記憶體和磁碟的空間開銷。同時該策略會優先儘量嘗試將資料快取在記憶體中,記憶體快取不下才會寫入磁碟。
通常不建議使用DISK_ONLY和字尾為_2的級別:因為完全基於磁碟檔案進行資料的讀寫,會導致效能急劇降低,有時還不如重新計算一次所有RDD。字尾為_2的級別,必須將所有資料都複製一份副本,併發送到其他節點上,資料複製以及網路傳輸會導致較大的效能開銷,除非是要求作業的高可用性,否則不建議使用。
測試案例
測試程式碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark RDD的持久化
*/
object _01SparkPersistOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
var start = System.currentTimeMillis()
val linesRDD = sc.textFile("D:/data/spark/sequences.txt")
// linesRDD.cache()
// linesRDD.persist(StorageLevel.MEMORY_ONLY)
// 執行第一次RDD的計算
val retRDD = linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
// retRDD.cache()
// retRDD.persist(StorageLevel.DISK_ONLY)
retRDD.count()
println("第一次計算消耗的時間:" + (System.currentTimeMillis() - start) + "ms")
// 執行第二次RDD的計算
start = System.currentTimeMillis()
// linesRDD.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).count()
retRDD.count()
println("第二次計算消耗的時間:" + (System.currentTimeMillis() - start) + "ms")
// 持久化使用結束之後,要想解除安裝資料
// linesRDD.unpersist()
sc.stop()
}
}
設定相關的持久化策略,再觀察執行時間就可以有一個較為直觀的理解。
二、共享變數
提供了兩種有限型別的共享變數,廣播變數和累加器。
介紹之前,先直接看下面一個例子:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
/**
* 共享變數
* 我們在dirver中宣告的這些區域性變數或者成員變數,可以直接在transformation中使用,
* 但是經過transformation操作之後,是不會將最終的結果重新賦值給dirver中的對應的變數。
* 因為通過action,觸發了transformation的操作,transformation的操作,都是通過
* DAGScheduler將程式碼打包 序列化 交由TaskScheduler傳送到各個Worker節點中的Executor去執行,
* 在transformation中執行的這些變數,是自己節點上的變數,不是dirver上最初的變數,我們只不過是將
* driver上的對應的變數拷貝了一份而已。
*
*
* 這個案例也反映出,我們需要有一些操作對應的變數,在driver和executor上面共享
*
* spark給我們提供了兩種解決方案——兩種共享變數
* 廣播變數
* 累加器
*/
object _02SparkShareVariableOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
val linesRDD = sc.textFile("D:/data/spark/hello.txt")
val wordsRDD = linesRDD.flatMap(_.split(" "))
var num = 0
val parisRDD = wordsRDD.map(word => {
num += 1
println("map--->num = " + num)
(word, 1)
})
val retRDD = parisRDD.reduceByKey(_ + _)
println("num = " + num)
retRDD.foreach(println)
println("num = " + num)
sc.stop()
}
}
輸出結果如下:
num = 0
map--->num = 1
map--->num = 1
map--->num = 2
map--->num = 2
map--->num = 3
map--->num = 4
(hello,3)
(you,1)
(me,1)
(he,1)
num = 0
2.1 廣播變數
Spark的另一種共享變數是廣播變數。通常情況下,當一個RDD的很多操作都需要使用driver中定義的變數時,每次操作,driver都要把變數傳送給worker節點一次,如果這個變數中的資料很大的話,會產生很高的傳輸負載,導致執行效率降低。使用廣播變數可以使程式高效地將一個很大的只讀資料傳送給多個worker節點,而且對每個worker節點只需要傳輸一次,每次操作時executor可以直接獲取本地儲存的資料副本,不需要多次傳輸。
這樣理解, 一個worker中的executor,有5個task執行,假如5個task都需要這從份共享資料,就需要向5個task都傳遞這一份資料,那就十分浪費網路資源和記憶體資源了。使用了廣播變數後,只需要向該worker傳遞一次就可以了。
建立並使用廣播變數的過程如下:
在一個型別T的物件obj上使用SparkContext.brodcast(obj)方法,建立一個Broadcast[T]型別的廣播變數,obj必須滿足Serializable。 通過廣播變數的.value()方法訪問其值。 另外,廣播過程可能由於變數的序列化時間過程或者序列化變數的傳輸過程過程而成為瓶頸,而Spark Scala中使用的預設的Java序列化方法通常是低效的,因此可以通過spark.serializer屬性為不同的資料型別實現特定的序列化方法(如Kryo)來優化這一過程。
測試程式碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
/**
* 使用Spark廣播變數
*
* 需求:
* 使用者表:
* id name age gender(0|1)
*
* 要求,輸出使用者資訊,gender必須為男或者女,不能為0,1
*/
object _03SparkBroadcastOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
val userList = List(
"001,劉向前,18,0",
"002,馮 劍,28,1",
"003,李志傑,38,0",
"004,郭 鵬,48,2"
)
val genderMap = Map("0" -> "女", "1" -> "男")
val genderMapBC:Broadcast[Map[String, String]] = sc.broadcast(genderMap)
val userRDD = sc.parallelize(userList)
val retRDD = userRDD.map(info => {
val prefix = info.substring(0, info.lastIndexOf(",")) // "001,劉向前,18"
val gender = info.substring(info.lastIndexOf(",") + 1)
val genderMapValue = genderMapBC.value
val newGender = genderMapValue.getOrElse(gender, "男")
prefix + "," + newGender
})
retRDD.foreach(println)
sc.stop()
}
}
輸出結果如下:
001,劉向前,18,女
003,李志傑,38,女
002,馮 劍,28,男
004,郭 鵬,48,男
當然這個案例只是演示一下程式碼的使用,並不能看出其執行的機制。
不過可以分析一下其原理,假如在執行map操作時,在某個Worker的一個Executor上有分配5個task來進行計算,在不使用廣播變數的情況下,因為Driver會將我們的程式碼通過DAGScheduler劃分會不同stage,交由taskScheduler,taskScheduler再將封裝好的一個個task分發到Worker的Excutor中,也就是說,這個過程當中,我們的genderMap也會被封裝到這個task中,顯然這個過程的粒度是task級別的,每個task都會封裝一個genderMap,在該變數資料量不大的情況下,是沒有問題的,然後,當資料量很大時,同時向一個Excutor上傳遞5份這樣相同的資料,這是很浪費網路中的頻寬資源的;廣播變數的使用可以避免這一問題的發生,將genderMap廣播出去之後,其只需要傳送給Excutor即可,它會儲存在Excutor的BlockManager中,此時,Excutor下面的task就可以共享這個變量了,這顯然可以帶來一定效能的提升。
注意:
- 不能廣播RDD,但是可以將RDD的結果廣播出去;
- 廣播變數只能在Driver端定義,在Excutor端使用,不能在Executor端更改變數值;
- 如果不使用廣播變數,在Executor端有幾個task,就會有幾個Driver端變數的副本;使用廣播變數,在每個Executor中只有一個Driver端的副本,由BlockManager物件管理;
2.2 累加器
Spark提供的Accumulator,主要用於多個節點對一個變數進行共享性的操作。Accumulator只提供了累加的功能。但是確給我們提供了多個task對一個變數並行操作的功能。但是task只能對Accumulator進行累加操作,不能讀取它的值。只有Driver程式可以讀取Accumulator的值。
非常類似於在MR中的一個Counter計數器,主要用於統計各個程式片段被呼叫的次數,和整體進行比較,來對資料進行一個評估。
測試程式碼如下:
package cn.xpleaf.bigdata.spark.scala.core.p3
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Spark共享變數之累加器Accumulator
*
* 需要注意的是,累加器的執行必須需要Action觸發
*/
object _04SparkAccumulatorOps {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkPersistOps.getClass.getSimpleName())
val sc = new SparkContext(conf)
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)
// 要對這些變數都*7,同時統計能夠被3整除的數字的個數
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)
val listRDD:RDD[Int] = sc.parallelize(list)
var counter = 0
val counterAcc = sc.accumulator[Int](0)
val mapRDD = listRDD.map(num => {
counter += 1
if(num % 3 == 0) {
counterAcc.add(1)
}
num * 7
})
// 下面這種操作又執行了一次RDD計算,所以可以考慮上面的方案,減少一次RDD的計算
// val ret = mapRDD.filter(num => num % 3 == 0).count()
mapRDD.foreach(println)
println("counter===" + counter)
println("counterAcc===" + counterAcc.value)
sc.stop()
}
}
輸出結果如下:
49
56
7
63
14
70
21
77
28
84
35
91
42
counter===0
counterAcc===4