1. 程式人生 > 實用技巧 >spark學習進度12(RDD的Checkpoint)

spark學習進度12(RDD的Checkpoint)

1. Checkpoint

目標
  1. Checkpoint 的作用

  2. Checkpoint 的使用

1.1. Checkpoint 的作用

Checkpoint 的主要作用是斬斷 RDD 的依賴鏈, 並且將資料儲存在可靠的儲存引擎中, 例如支援分散式儲存和副本機制的 HDFS.

Checkpoint 的方式
  • 可靠的將資料儲存在可靠的儲存引擎中, 例如 HDFS

  • 本地的將資料儲存在本地

什麼是斬斷依賴鏈

斬斷依賴鏈是一個非常重要的操作, 接下來以 HDFS 的 NameNode 的原理來舉例說明

HDFS 的 NameNode 中主要職責就是維護兩個檔案, 一個叫做edits

, 另外一個叫做fsimage.edits中主要存放EditLog,FsImage儲存了當前系統中所有目錄和檔案的資訊. 這個FsImage其實就是一個Checkpoint.

HDFS 的 NameNode 維護這兩個檔案的主要過程是, 首先, 會由fsimage檔案記錄當前系統某個時間點的完整資料, 自此之後的資料並不是時刻寫入fsimage, 而是將操作記錄儲存在edits檔案中. 其次, 在一定的觸發條件下,edits會將自身合併進入fsimage. 最後生成新的fsimage檔案,edits重置, 從新記錄這次fsimage以後的操作日誌.

如果不合並edits進入fsimage

會怎樣? 會導致edits中記錄的日誌過長, 容易出錯.

所以當 Spark 的一個 Job 執行流程過長的時候, 也需要這樣的一個斬斷依賴鏈的過程, 使得接下來的計算輕裝上陣.

Checkpoint 和 Cache 的區別

Cache 可以把 RDD 計算出來然後放在記憶體中, 但是 RDD 的依賴鏈(相當於 NameNode 中的 Edits 日誌)是不能丟掉的, 因為這種快取是不可靠的, 如果出現了一些錯誤(例如 Executor 宕機), 這個 RDD 的容錯就只能通過回溯依賴鏈, 重放計算出來.

但是 Checkpoint 把結果儲存在 HDFS 這類儲存中, 就是可靠的了, 所以可以斬斷依賴, 如果出錯了, 則通過複製 HDFS 中的檔案來實現容錯.

所以他們的區別主要在以下兩點

  • Checkpoint 可以儲存資料到 HDFS 這類可靠的儲存上, Persist 和 Cache 只能儲存在本地的磁碟和記憶體中

  • Checkpoint 可以斬斷 RDD 的依賴鏈, 而 Persist 和 Cache 不行

  • 因為 CheckpointRDD 沒有向上的依賴鏈, 所以程式結束後依然存在, 不會被刪除. 而 Cache 和 Persist 會在程式結束後立刻被清除.

1.2. 使用 Checkpoint

 @Test
  def checkpoint(): Unit = {
    val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string")
    val sc = new SparkContext(conf)
    sc.setCheckpointDir("checkpoint")//這裡設定

    var interimRDD = sc.textFile("dataset/access_log_sample.txt")
      .map(item => (item.split(" ")(0), 1))
      .filter(item => StringUtils.isNotBlank(item._1))
      .reduceByKey((curr, agg) => curr + agg)
    //下面打搭配才是最好的
    interimRDD=interimRDD.cache()
    interimRDD.checkpoint()//這裡(不準確的說他是一個action)
    interimRDD.collect().foreach(println(_))
    sc.stop()
  }