1. 程式人生 > >spark的持久化和共享變量

spark的持久化和共享變量

執行 true ase 技術 str 明顯 情況 但是 滿足

1. 持久化算子cache

  介紹:正常情況下,一個RDD是不包含真實數據的,只包含描述這個RDD元數據信息,如果對這個RDD調用cache方法,那麽這個RDD的數據,依然沒有真實數據,直到第一次調用一個action的算子觸發了這個RDD的數據生成,那麽cache操作就會把數據存儲在內存中,所以第二次重復利用這個RDD的時候,計算速度將會快很多。
技術分享圖片
其中最主要的儲存級別為:

//不存儲在內存也不在磁盤
val NONE = new StorageLevel(false, false, false, false)
//存儲在磁盤
val DISK_ONLY = new StorageLevel(true, false, false, false)
//存儲在磁盤,保存2份
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
//存儲在內存
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
//存儲在內存 保存2份
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
//存儲在內存並序列化
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
//內存磁盤結合使用
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
//存儲在堆外內存
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

相應的操作:

        //設置持久化
        listRDD.cache()
        //移除持久化
        listRDD.unpersist()

2. 共享變量

  介紹:在 Spark 程序中,當一個傳遞給 Spark 操作(例如 map 和 reduce)的函數在遠程節點上面運行 時,Spark 操作實際上操作的是這個函數所用變量的一個獨立副本。這些變量會被復制到每臺機器上,並且這些變量在遠程機器上的所有更新都不會傳遞回驅動程序。通常跨任務的讀 寫變量是低效的,但是,Spark 還是為兩種常見的使用模式提供了兩種有限的共享變量:廣播變量

(Broadcast Variable)和累加器(Accumulator)。

 (1)廣播變量

在不使用廣播變量的時候:

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        //這一句代碼是在 driver中執行的,相當於這個變量是在driver進程中的。
        val a=3

        /**
          * kv._2+a這句代碼是在executor中執行的,
          * 其中a這個變量會在和f序列化的過程中,會攜帶過去。
          * 並且每一個task都會復制一份,可想而知如果這個a變量是一個大對象,那就是一個災難
          */
        listRDD.map(kv=>(kv._1,kv._2+a))
    }
}

技術分享圖片
使用廣播變量的時候:

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val list = List(("math", 18), ("hbase", 18), ("hive", 22), ("hive", 18))
        val listRDD: RDD[(String, Int)] = sc.parallelize(list)
        //這一句代碼是在 driver中執行的,相當於這個變量是在driver進程中的。
        val a=3
        //設置廣播變量,每一個executor中的task共享一個廣播變量
        val broadcast: Broadcast[Int] = sc.broadcast(a)
        listRDD.map(kv=>{
            //獲取廣播變量
            val aa=broadcast.value
            (kv._1,kv._2+aa)
        })
    }
}

技術分享圖片
總結:如果 executor 端用到了 Driver 的變量,如果不使用廣播變量在 Executor 有多少 task 就有 多少 Driver 端的變量副本。如果 Executor 端用到了 Driver 的變量,如果使用廣播變量在每個 Executor 中都只有一份 Driver 端的變量副本。
使用的廣播變量的條件
   - 廣播變量只能在driver端定義,不能在executor中定義
   - 在 Driver 端可以修改廣播變量的值,在 Executor 端無法修改廣播變量的值。
   - 廣播變量的值越大,使用廣播變量的優勢越明顯
   - task個數越多,使用廣播變量的優勢越明顯

 (2)累加器

   介紹:在 Spark 應用程序中,我們經常會有這樣的需求,如異常監控,調試,記錄符合某特性的數據的數目,這種需求都需要用到計數器,如果一個變量不被聲明為一個累加器,那麽它將在被改變時不會在 driver 端進行全局匯總,即在分布式運行時每個 task 運行的只是原始變量的一個副本,並不能改變原始變量的值,但是當這個變量被聲明為累加器後,該變量就會有分布式計數的功能。
案例

object SparktTest {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        conf.setAppName("SparktTest")
        conf.setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        //統計文件有多少行
        val hdfsRDD: RDD[String] = sc.textFile("/data/word.txt")
        //設置累加器
        val mysum: LongAccumulator = sc.longAccumulator("Mysum")
        hdfsRDD.map(line=>{
            mysum.add(1)
            line
        }).collect() //觸發提交操作
        //獲取累加器的值
        println(mysum.value)
        //重置累加器
        mysum.reset()
    }
}

使用累加器的註意事項
   - 累加器在 Driver 端定義賦初始值,累加器只能在 Driver 端讀取最後的值,在 Excutor 端更新。
   - 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
   - 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
   - 累加器不是一個調優的操作,因為如果不這樣做,結果是錯的。
   - 如果系統自帶的累加器不能滿足要求,還可以自定義累加器

spark的持久化和共享變量