1. 程式人生 > 其它 >spark-RDD快取,checkpoint機制,有向無環圖,stage

spark-RDD快取,checkpoint機制,有向無環圖,stage

spark-RDD快取,checkpoint機制,有向無環圖,stage

1.RDD依賴關係

  • RDD依賴關係有2種不同型別,窄依賴和寬依賴。

  • 窄依賴(narrow dependency):是指每個父RDD的Partition最多被子RDD一個Partition使用。就好像獨生子女一樣。窄依賴的運算元包括:map,filter,flatMap等。如下圖 :1對1 , 多對1

  • 寬依賴(wide dependency):多個子RDD的Partition會依賴統一個父RDD的Partition。就好像超生。寬依賴常見運算元包括:reduceByKey,groupBy,groupByKey,sortBy,sortByKey等。 寬依賴會產生shuffle,如下圖: 多對多,1對多
  • 相比於寬依賴,窄依賴對優化很有利 ,主要基於以下兩點:
1.寬依賴往往對應著shuffle操作( 多對多,彙總,多節點),需要在執行過程中將同一個父RDD的分割槽傳入到不同的子RDD分割槽中,中間可能涉及多個節點之間的資料傳輸;而窄依賴的每個父RDD的分割槽只會傳入到一個子RDD分割槽中,通常可以在一個節點內完成轉換。

2.當RDD分割槽丟失時(某個節點故障),spark會對資料進行重算。
	a. 對於窄依賴,由於父RDD的一個分割槽只對應一個子RDD分割槽,這樣只需要重算和子RDD分割槽對應的父RDD分割槽即可,所以這個重算對資料的利用率是100%的;
	b. 對於寬依賴,重算的父RDD分割槽對應多個子RDD分割槽,這樣實際上父RDD 中只有一部分的資料是被用於恢復這個丟失的子RDD分割槽的,另一部分對應子RDD的其它未丟失分割槽,這就造成了多餘的計算;更一般的,寬依賴中子RDD分割槽通常來自多個父RDD分割槽,極端情況下,所有的父RDD分割槽都要進行重新計算。

2.lineage(血統)

  • 血統就是將RDD與RDD之間依賴關係進行記錄,如果當某個RDD分割槽資料丟失後,可以通過這種記錄下來的關係進行重新計算,恢復得到的資料,這是spark帶的容錯機制。

3.RDD快取

  • 我們後期可以把RDD資料快取起來,後續其他的job需要用到該RDD的結果資料,可以直接從快取得到避免重複計算。魂村可以加快資料訪問。

  • RDD設定快取方式有2種:

    1. cache: 預設把資料儲存到記憶體中,本質是呼叫presist() 預設儲存級別是MEMORY_ONLY
    2. presist:可以把資料儲存在記憶體或者磁碟中,它內部可以有封裝快取級別,這些快取級別都被定義在一個Object中(StorageLevel中設定儲存種類)
  • 進入 spark shell 演示

    spark-shell --master spark://1.0.0.155:7077 --executor-memory 1g --total-executor-cores 2
    
  • cache使用

    # 從hdfs讀取
    scala> val rdd1 = sc.textFile("/u.txt")
    # 計入快取
    scala> rdd1.cache
    # 此時檢視http://linux01:4040/Storage/ 是沒有任何快取資訊,這是因為在使用cache時候需要action觸發
    scala> rdd1.collect
    # 可以看到如下圖
    

    ![image-20210622111814117](C:\Users\Xu jk\AppData\Roaming\Typora\typora-user-images\image-20210622111814117.png)

    # 你可以繼續進行運算元操作
    scala> val rdd2 = rdd1.flatMap(_.split(" "))
    # 通過觸發action,從快取拿取資料,執行運算元操作
    scala> rdd2.collect
    

    當退出spark-shell快取也隨之消失

  • presist使用

    # 雖然設定記憶體和磁碟的級別,但儲存資料量較小,是不會分配到磁碟上的。
    scala> rdd2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER_2)
    scala> rdd2.collect
    # 如果想直接儲存到磁碟,更改級別。
    scala> val rdd3 = rdd2.map(x=>(x,1))
    scala> rdd3.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
    scala> rdd3.collect
    
  • 從rdd1->rdd2->rdd3-> ..rddn 每一個步驟,如果設定快取它會從快取中拿取資料,而不是通過計算後再執行下一個運算元操作。

  • 快取之後生命週期

    當任務結束,快取資料也隨之消失
    
  • 快取資料的清除

    1.自動清除
    	程式執行完畢,自動清除
    2.手動清除
    	scala> rdd1.unpersist(true) // 預設為true,表示阻塞刪除
    
  • 關於快取設定應用場景

    1.當某個RDD的資料被使用多次,可以設定快取
    	val rdd1 = sc.textFile("words.txt")
    	rdd1.cache
    	val rdd2=rdd1.flatMap(_.split(" "))
    	val rdd3=rdd1.map((_,1))
    	rdd2.collect
    	rdd3.collect
    2.當某個RDD它是經過大量複雜運算元操作,計算週期時間很長,將它設定快取。
    

4.RDD的checkpoint機制

  • 當對RDD資料進行快取,儲存在記憶體或磁碟中,後續就可以直接從記憶體或者磁碟中獲取得到,但是不安全。

    • cache:在記憶體中,雖然後期操作速度比較快,直接從記憶體中獲取,但是不安全,比如伺服器突然掛掉,或者程序終止,它都會導致資料丟失。
    • persist: 它可以儲存資料到磁碟中,雖然速度慢,相對cache安全一點,但也不是特別安全,假如系統管理員誤操作刪除導致磁碟損壞,導致資料丟失。
  • 而checkpoint機制它提供一種相對更加可靠資料持久方式,它把資料儲存在分散式檔案系統上,比如HDFS上,它利用HDFS高可用,高容錯(多副本)來保證資料安全性。

  • checkpoint的使用

# hdfs建立checkponit目錄
scala> sc.setCheckpointDir("/checkpoint")
# 此時檢視hdfs 多了一個checkpoint
[root@linux01 data]# hdfs dfs -ls /
drwxr-xr-x   - root supergroup          0 2021-06-22 13:18 /checkpoint
# 讀出檔案
scala> val rdd1=sc.textFile("/u.txt")
# 對rdd1進行checkpoint
scala> rdd1.checkpoint
# 運算元操作
scala> val rdd2 = rdd1.flatMap(_.split(" "))
# 觸發action 才會觸發checkpoint
scala> rdd2.collect
# 檢視hdfs儲存檔案,可以看到多了part-00000和part-00001兩個檔案
[root@linux01 data]# hdfs dfs -ls /checkpoint/e5a6cb9f-373c-44ec-8730-7eda0e6067dc/rdd-3
	part-00000
	part-00001
  • http://linux01:4040/jobs/ job任務看到會有2個job任務完成,其中一個就是checkpoint,一個是job任務。

5.cache , presist,checkpoint三者之間區別

cache和presist分別可以把RDD資料快取在記憶體或者本地磁碟,後續要觸發cache和presist持久化操作。需要有一個action,它不會開啟其他新的job,一個action對應一個job。在執行的過程到程式結束後,對應的快取資料就自動消失了。它不會改變RDD的依賴關係。

checkpoint:可以把資料持久寫入hdfs上,後續要觸發checkpoint操作,需要有一個action、任務在執行過程到程式結束之後,對應快取資料不會消失,它會改變rdd的依賴關係。後續資料丟失了不能再通過血統進行資料恢復。
	checkpoint操作要執行需要一個action操作,一個action操作對應後續的一個job,該job執行完成之後,它會再次單獨開啟另一個job來執行rdd1.checkpoint操作。
	
所以checkpoint執行action會開啟2個job,而cache,presist 只會開啟1個job
  • 資料恢復順序:
cache -> checkpoint -> 重新計算

6.有向無環圖生成

  • DAG(Directed Acyclic Graph)叫做有向無環圖(有方向,無閉環,代表著資料的流向),原始RDD通過一系列的轉換形成了DAG

  • 當我們執行一個單詞統計的job任務時候,登入到:http://linux01:4040/jobs/可以檢視到DAG圖,如下圖:

sc.textFile("/u.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
  • 該方向就是RDD運算元操作順序,這裡它把DAG圖劃分成了不同的stage(排程階段)。

7.stage是什麼?怎麼劃分

  • stage表示不同的排程階段,一個spark中的job 會對應很多個stage(排程階段)。

  • 為什麼要劃分stage?

由於在同一個stage中,沒有寬依賴,都是窄依賴,後期spark的任務是以task執行緒方式去執行的,一個分割槽就對應一個task,在同一個stage中有很多可以並行執行的task。
  • 如何劃分stage?
1、拿到DAG有向無環圖之後,從最後一個RDD往前推,首先建立一個stage,然後把當前RDD加入到本stage中。它是最後一個stage。
2、在往前推的過程中,如果遇到窄依賴,就把該RDD加入到stage中,如果遇到寬依賴,就從寬依賴切開,當前一個stage也就結束了。
3、然後重新建立一個新的stage,還是按照第二個步驟往前推,一直到最開始RDD。
  • stage與stage之間的關係?
劃分stage之後,每一個stage中有很多可以並行執行的task,後期它會把每個stage中這些可以並行執行的task封裝在一個taskSet集合中。它會把taskSet集合中的task執行緒提交到worker節點上的executor程序中執行。
  • 寬依賴是劃分stage的依據,後面stage中task輸入資料是前面stage中task輸出結果資料。