spark的持久化和共享變量
介紹:正常情況下,一個RDD是不包含真實數據的,只包含描述這個RDD元數據信息,如果對這個RDD調用cache方法,那麽這個RDD的數據,依然沒有真實數據,直到第一次調用一個action的算子觸發了這個RDD的數據生成,那麽cache操作就會把數據存儲在內存中,所以第二次重復利用這個RDD的時候,計算速度將會快很多。
其中最主要的儲存級別為:
//不存儲在內存也不在磁盤 val NONE = new StorageLevel(false, false, false, false) //存儲在磁盤 val DISK_ONLY = new StorageLevel(true, false, false, false) //存儲在磁盤,保存2份 val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) //存儲在內存 val MEMORY_ONLY = new StorageLevel(false, true, false, true) //存儲在內存 保存2份 val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) //存儲在內存並序列化 val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) //內存磁盤結合使用 val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) //存儲在堆外內存 val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
相應的操作:
//設置持久化
listRDD.cache()
//移除持久化
listRDD.unpersist()
2. 共享變量
介紹:在 Spark 程序中,當一個傳遞給 Spark 操作(例如 map 和 reduce)的函數在遠程節點上面運行 時,Spark 操作實際上操作的是這個函數所用變量的一個獨立副本。這些變量會被復制到每臺機器上,並且這些變量在遠程機器上的所有更新都不會傳遞回驅動程序。通常跨任務的讀 寫變量是低效的,但是,Spark 還是為兩種常見的使用模式提供了兩種有限的共享變量:廣播變量
(1)廣播變量
在不使用廣播變量的時候:
object SparktTest { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() conf.setAppName("SparktTest") conf.setMaster("local[2]") val sc: SparkContext = new SparkContext(conf) val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18)) val listRDD: RDD[(String, Int)] = sc.parallelize(list) //這一句代碼是在 driver中執行的,相當於這個變量是在driver進程中的。 val a=3 /** * kv._2+a這句代碼是在executor中執行的, * 其中a這個變量會在和f序列化的過程中,會攜帶過去。 * 並且每一個task都會復制一份,可想而知如果這個a變量是一個大對象,那就是一個災難 */ listRDD.map(kv=>(kv._1,kv._2+a)) } }
使用廣播變量的時候:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
val listRDD: RDD[(String, Int)] = sc.parallelize(list)
//這一句代碼是在 driver中執行的,相當於這個變量是在driver進程中的。
val a=3
//設置廣播變量,每一個executor中的task共享一個廣播變量
val broadcast: Broadcast[Int] = sc.broadcast(a)
listRDD.map(kv=>{
//獲取廣播變量
val aa=broadcast.value
(kv._1,kv._2+aa)
})
}
}
總結:如果 executor 端用到了 Driver 的變量,如果不使用廣播變量在 Executor 有多少 task 就有 多少 Driver 端的變量副本。如果 Executor 端用到了 Driver 的變量,如果使用廣播變量在每個 Executor 中都只有一份 Driver 端的變量副本。
使用的廣播變量的條件:
- 廣播變量只能在driver端定義,不能在executor中定義
- 在 Driver 端可以修改廣播變量的值,在 Executor 端無法修改廣播變量的值。
- 廣播變量的值越大,使用廣播變量的優勢越明顯
- task個數越多,使用廣播變量的優勢越明顯
(2)累加器
介紹:在 Spark 應用程序中,我們經常會有這樣的需求,如異常監控,調試,記錄符合某特性的數據的數目,這種需求都需要用到計數器,如果一個變量不被聲明為一個累加器,那麽它將在被改變時不會在 driver 端進行全局匯總,即在分布式運行時每個 task 運行的只是原始變量的一個副本,並不能改變原始變量的值,但是當這個變量被聲明為累加器後,該變量就會有分布式計數的功能。
案例:
object SparktTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
conf.setAppName("SparktTest")
conf.setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
//統計文件有多少行
val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")
//設置累加器
val mysum: LongAccumulator = sc.longAccumulator("Mysum")
hdfsRDD.map(line=>{
mysum.add(1)
line
}).collect() //觸發提交操作
//獲取累加器的值
println(mysum.value)
//重置累加器
mysum.reset()
}
}
使用累加器的註意事項
- 累加器在 Driver 端定義賦初始值,累加器只能在 Driver 端讀取最後的值,在 Excutor 端更新。
- 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
- 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
- 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
- 如果系統自帶的累加器不能滿足要求,還可以自定義累加器
spark的持久化和共享變量