spark-RDD快取,checkpoint機制,有向無環圖,stage
spark-RDD快取,checkpoint機制,有向無環圖,stage
1.RDD依賴關係
-
RDD依賴關係有2種不同型別,窄依賴和寬依賴。
-
窄依賴(narrow dependency):是指每個父RDD的Partition最多被子RDD一個Partition使用。就好像獨生子女一樣。窄依賴的運算元包括:map,filter,flatMap等。如下圖 :1對1 , 多對1
- 寬依賴(wide dependency):多個子RDD的Partition會依賴統一個父RDD的Partition。就好像超生。寬依賴常見運算元包括:reduceByKey,groupBy,groupByKey,sortBy,sortByKey等。 寬依賴會產生shuffle,如下圖: 多對多,1對多
- 相比於寬依賴,窄依賴對優化很有利 ,主要基於以下兩點:
1.寬依賴往往對應著shuffle操作( 多對多,彙總,多節點),需要在執行過程中將同一個父RDD的分割槽傳入到不同的子RDD分割槽中,中間可能涉及多個節點之間的資料傳輸;而窄依賴的每個父RDD的分割槽只會傳入到一個子RDD分割槽中,通常可以在一個節點內完成轉換。 2.當RDD分割槽丟失時(某個節點故障),spark會對資料進行重算。 a. 對於窄依賴,由於父RDD的一個分割槽只對應一個子RDD分割槽,這樣只需要重算和子RDD分割槽對應的父RDD分割槽即可,所以這個重算對資料的利用率是100%的; b. 對於寬依賴,重算的父RDD分割槽對應多個子RDD分割槽,這樣實際上父RDD 中只有一部分的資料是被用於恢復這個丟失的子RDD分割槽的,另一部分對應子RDD的其它未丟失分割槽,這就造成了多餘的計算;更一般的,寬依賴中子RDD分割槽通常來自多個父RDD分割槽,極端情況下,所有的父RDD分割槽都要進行重新計算。
2.lineage(血統)
- 血統就是將RDD與RDD之間依賴關係進行記錄,如果當某個RDD分割槽資料丟失後,可以通過這種記錄下來的關係進行重新計算,恢復得到的資料,這是spark帶的容錯機制。
3.RDD快取
-
我們後期可以把RDD資料快取起來,後續其他的job需要用到該RDD的結果資料,可以直接從快取得到避免重複計算。魂村可以加快資料訪問。
-
RDD設定快取方式有2種:
- cache: 預設把資料儲存到記憶體中,本質是呼叫presist() 預設儲存級別是MEMORY_ONLY
- presist:可以把資料儲存在記憶體或者磁碟中,它內部可以有封裝快取級別,這些快取級別都被定義在一個Object中(StorageLevel中設定儲存種類)
-
進入 spark shell 演示
spark-shell --master spark://1.0.0.155:7077 --executor-memory 1g --total-executor-cores 2
-
cache使用
# 從hdfs讀取 scala> val rdd1 = sc.textFile("/u.txt") # 計入快取 scala> rdd1.cache # 此時檢視http://linux01:4040/Storage/ 是沒有任何快取資訊,這是因為在使用cache時候需要action觸發 scala> rdd1.collect # 可以看到如下圖
![image-20210622111814117](C:\Users\Xu jk\AppData\Roaming\Typora\typora-user-images\image-20210622111814117.png)
# 你可以繼續進行運算元操作 scala> val rdd2 = rdd1.flatMap(_.split(" ")) # 通過觸發action,從快取拿取資料,執行運算元操作 scala> rdd2.collect
當退出spark-shell快取也隨之消失
-
presist使用
# 雖然設定記憶體和磁碟的級別,但儲存資料量較小,是不會分配到磁碟上的。 scala> rdd2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER_2) scala> rdd2.collect # 如果想直接儲存到磁碟,更改級別。 scala> val rdd3 = rdd2.map(x=>(x,1)) scala> rdd3.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY) scala> rdd3.collect
-
從rdd1->rdd2->rdd3-> ..rddn 每一個步驟,如果設定快取它會從快取中拿取資料,而不是通過計算後再執行下一個運算元操作。
-
快取之後生命週期
當任務結束,快取資料也隨之消失
-
快取資料的清除
1.自動清除 程式執行完畢,自動清除 2.手動清除 scala> rdd1.unpersist(true) // 預設為true,表示阻塞刪除
-
關於快取設定應用場景
1.當某個RDD的資料被使用多次,可以設定快取 val rdd1 = sc.textFile("words.txt") rdd1.cache val rdd2=rdd1.flatMap(_.split(" ")) val rdd3=rdd1.map((_,1)) rdd2.collect rdd3.collect 2.當某個RDD它是經過大量複雜運算元操作,計算週期時間很長,將它設定快取。
4.RDD的checkpoint機制
-
當對RDD資料進行快取,儲存在記憶體或磁碟中,後續就可以直接從記憶體或者磁碟中獲取得到,但是不安全。
- cache:在記憶體中,雖然後期操作速度比較快,直接從記憶體中獲取,但是不安全,比如伺服器突然掛掉,或者程序終止,它都會導致資料丟失。
- persist: 它可以儲存資料到磁碟中,雖然速度慢,相對cache安全一點,但也不是特別安全,假如系統管理員誤操作刪除導致磁碟損壞,導致資料丟失。
-
而checkpoint機制它提供一種相對更加可靠資料持久方式,它把資料儲存在分散式檔案系統上,比如HDFS上,它利用HDFS高可用,高容錯(多副本)來保證資料安全性。
-
checkpoint的使用
# hdfs建立checkponit目錄
scala> sc.setCheckpointDir("/checkpoint")
# 此時檢視hdfs 多了一個checkpoint
[root@linux01 data]# hdfs dfs -ls /
drwxr-xr-x - root supergroup 0 2021-06-22 13:18 /checkpoint
# 讀出檔案
scala> val rdd1=sc.textFile("/u.txt")
# 對rdd1進行checkpoint
scala> rdd1.checkpoint
# 運算元操作
scala> val rdd2 = rdd1.flatMap(_.split(" "))
# 觸發action 才會觸發checkpoint
scala> rdd2.collect
# 檢視hdfs儲存檔案,可以看到多了part-00000和part-00001兩個檔案
[root@linux01 data]# hdfs dfs -ls /checkpoint/e5a6cb9f-373c-44ec-8730-7eda0e6067dc/rdd-3
part-00000
part-00001
- 在
http://linux01:4040/jobs/
job任務看到會有2個job任務完成,其中一個就是checkpoint,一個是job任務。
5.cache , presist,checkpoint三者之間區別
cache和presist分別可以把RDD資料快取在記憶體或者本地磁碟,後續要觸發cache和presist持久化操作。需要有一個action,它不會開啟其他新的job,一個action對應一個job。在執行的過程到程式結束後,對應的快取資料就自動消失了。它不會改變RDD的依賴關係。
checkpoint:可以把資料持久寫入hdfs上,後續要觸發checkpoint操作,需要有一個action、任務在執行過程到程式結束之後,對應快取資料不會消失,它會改變rdd的依賴關係。後續資料丟失了不能再通過血統進行資料恢復。
checkpoint操作要執行需要一個action操作,一個action操作對應後續的一個job,該job執行完成之後,它會再次單獨開啟另一個job來執行rdd1.checkpoint操作。
所以checkpoint執行action會開啟2個job,而cache,presist 只會開啟1個job
- 資料恢復順序:
cache -> checkpoint -> 重新計算
6.有向無環圖生成
-
DAG(Directed Acyclic Graph)叫做有向無環圖(有方向,無閉環,代表著資料的流向),原始RDD通過一系列的轉換形成了DAG
-
當我們執行一個單詞統計的job任務時候,登入到:
http://linux01:4040/jobs/
可以檢視到DAG圖,如下圖:
sc.textFile("/u.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
- 該方向就是RDD運算元操作順序,這裡它把DAG圖劃分成了不同的stage(排程階段)。
7.stage是什麼?怎麼劃分
-
stage表示不同的排程階段,一個spark中的job 會對應很多個stage(排程階段)。
-
為什麼要劃分stage?
由於在同一個stage中,沒有寬依賴,都是窄依賴,後期spark的任務是以task執行緒方式去執行的,一個分割槽就對應一個task,在同一個stage中有很多可以並行執行的task。
- 如何劃分stage?
1、拿到DAG有向無環圖之後,從最後一個RDD往前推,首先建立一個stage,然後把當前RDD加入到本stage中。它是最後一個stage。
2、在往前推的過程中,如果遇到窄依賴,就把該RDD加入到stage中,如果遇到寬依賴,就從寬依賴切開,當前一個stage也就結束了。
3、然後重新建立一個新的stage,還是按照第二個步驟往前推,一直到最開始RDD。
- stage與stage之間的關係?
劃分stage之後,每一個stage中有很多可以並行執行的task,後期它會把每個stage中這些可以並行執行的task封裝在一個taskSet集合中。它會把taskSet集合中的task執行緒提交到worker節點上的executor程序中執行。
- 寬依賴是劃分stage的依據,後面stage中task輸入資料是前面stage中task輸出結果資料。