s27賽季限定返場突變,留1w戰令幣巨賺,諸葛亮笑了,5折商店上線
二、掌握spark RDD的概念、運算元的作用和使用(包括建立和各種轉換運算,具體到程式碼的編寫使用),不同共享變數的作用和使用、對於RDD的依賴關係要理解,知道持久化的方法以及型別;
1、Spark RDD的概念
RDD是可擴充套件的彈性分散式資料集(一種容錯的並行資料結構);
是隻讀、分割槽且不變的資料集合;
是Spark的基石,也是Spark的靈魂;
是一種分散式的記憶體抽象,不具備Schema的資料結構(可以基於任何資料結構建立,如tuple(元組)、dict(字典)和list(列表))
RDD的五個主要屬性:
(1)分割槽資訊(Partition)(2)自定義分片計算(3)RDD之間相互依賴(4)控制分片數量(5)使用列表方式進行塊儲存
4、共享變數
(1)累加器(Accumulator)
累加器是一個全域性的共享變數,累加器可以很好地解決上述程式的閉包問題。使用累加器完成相同的功能,程式碼如下:
sum = sc.accumulator(0) # 建立一個累加器,初值為0 def fn1(x): global sum sum += x # 注意這裡不能是 sum=sum+x,因為+=是原地操作,+是需要兩個變數型別一致。 a_rdd = sc.parallelize([1, 2, 3, 4, 5]) a_rdd.foreach(fn1) print(sum.value) # sum.value可以獲取累加器的值,此時列印輸出的是15
累加器是一個write-only的變數,工作節點worker中的task無法讀取這個值,只能在驅動程式中使用value方法來讀取累加器的值。
(2)廣播變數(Broadcast)
廣播變數和累加器類似,也是一個共享變數,廣播變數能夠以一種更有效率的方式將一個大資料量輸入集合的副本分配給每個節點。
SparkContext物件的broadcast方法可以建立廣播變數,廣播變數的value屬性可以獲取該廣播變數的值,unpersist方法可以在執行程式上刪除此廣播的快取副本。destroy方法可以銷燬廣播變數,一旦廣播變數被銷燬,就不能再使用了。
>>> b = sc.broadcast(10) # 建立一個廣播物件 >>> b.value # 獲取廣播物件的值 10 >>> sc.parallelize([1, 2, 3, 4, 5]).map(lambda x: x * b.value).collect() [10, 20, 30, 40, 50] >>> b.destroy() # 銷燬廣播變數,銷燬後就不能訪問它的value了 >>> b.value # 但是pyspark中還是能訪問到這個值,這是pyspark的問題,如果是scala確實是無法訪問它的值了 10 >>> sc.parallelize([1, 2, 3, 4, 5]).map(lambda x: x * b.value).collect() # task中確實無法訪問該廣播變數的值了
5、依賴問題
(1)RDD只能基於在穩定物理儲存中的資料集和其他已有的RDD上執行確定性操作來建立。
(2)RDD在血統依賴方面,分為窄依賴和寬依賴。他們用來解決資料容錯的高效性。
窄依賴:
一個父RDD的分割槽partition最多被子RDD的一個分割槽使用(獨生子女)。
在一個叢集節點上管道式執行。
比如map、filter、union等;
寬依賴:
多個子RDD的Partition會依賴同一個父RDD的Partition,會引起shuffle(超生)。
比如groupByKey、reduceByKey、 sortBy、partitionBy等;
注意:一個RDD對不同的父節點可能有不同的依賴方式,可能對父節點1是寬依賴,對父節點2是窄依賴。
shuffle:Spark 裡的某些操作會觸發 shuffle,shuffle 是spark 重新分配資料的一種機制,使得這些資料可以跨不同的區域進行分組。
DAG:Spark裡的每一個轉換操作都會生成一個新的RDD,RDD之間連一條邊,最後這些RDD和他們之間的邊組成一個有向無環圖DAG(Directed Acyclic Graph)。
一個Stage的開始就是從外部儲存或者shuffle結果中讀取資料;一個Stage的結束就是發生shuffle或者生成結果時。
spark劃分stage的整體思路是:從後往前推,遇到寬依賴就斷開,劃分為一個stage;遇到窄依賴就將這個RDD加入該stage中。
6、RDD的持久化
可以使用persist方法和cache方法,cache方法只能快取在記憶體中, persist方法可以快取在磁碟上或者記憶體中。
is_cached屬性可以檢視當前RDD的持久化狀態,或者使用getStorageLevel方法獲取當前RDD的持久化狀態,unpersist方法可以解除RDD的持久化
StorageLevel****型別 | 型別描述 | 對應的****useDisk, useMemory, deserialized, off_heap, replication |
---|---|---|
MEMORY_ONLY | (預設級別)將RDD以JAVA物件的形式儲存到JVM記憶體。如果分片太大,記憶體快取不下,就不快取 | StorageLevel(False, True, False, False, 1) |
MEMORY_ONLY_2 | (預設級別)將RDD以JAVA物件的形式儲存到JVM記憶體。如果分片太大,記憶體快取不下,就不快取,將分割槽複製到兩個叢集節點上 | StorageLevel(False, True, False, False, 2) |
MEMORY_ONLY_SER | 將RDD以序列化的JAVA物件形式儲存到記憶體 | StorageLevel(False, True, False, False, 1) |
MEMORY_ONLY_SER_2 | 將RDD以序列化的JAVA物件形式儲存到記憶體,將分割槽複製到兩個叢集節點上 | StorageLevel(False, True, False, False, 2) |
DISK_ONLY | 將RDD持久化到硬碟 | StorageLevel(True, False, False, False, 1) |
DISK_ONLY_2 | 將RDD持久化到硬碟,將分割槽複製到兩個叢集節點上 | StorageLevel(True, False, False, False, 2) |
MEMORY_AND_DISK | 將RDD資料集以JAVA物件的形式儲存到JVM記憶體中,如果分片太大不能儲存到記憶體中,則儲存到磁碟上,下次用時重新從磁碟讀取 | StorageLevel(True, True, False, False, 1) |
MEMORY_AND_DISK_2 | 將RDD資料集以JAVA物件的形式儲存到JVM記憶體中,如果分片太大不能儲存到記憶體中,則儲存到磁碟上,下次用時重新從磁碟讀取,並將分割槽複製到兩個叢集節點上 | StorageLevel(True, True, False, False, 2) |
MEMORY_AND_DISK_SER | 與MEMORY_ONLY_SER類似,但當分片太大,不能儲存到記憶體中,會將其儲存到磁碟中 | StorageLevel(True, True, False, False, 1) |
MEMORY_AND_DISK_SER_2 | 與MEMORY_ONLY_SER類似,但當分片太大,不能儲存到記憶體中,會將其儲存到磁碟中,將分割槽複製到兩個叢集節點上 | StorageLevel(True, True, False, False, 2) |
OFF_HEAP | 是否利用java unsafe API實現的記憶體管理,RDD實際被儲存到Tachyon | StorageLevel(True, True, True, False, 1) |
檢查點 checkpoint
通過cache或者persist將RDD持久化到記憶體或者磁碟中,這樣做並不能保證資料完全不會丟失,當資料丟失的時候,Spark會根據RDD的計算流程DGA重新計算一遍,這樣子就很費效能,
checkpoint的作用就是將DAG中比較重要的中間資料做一個檢查點將結果儲存到一個高可用的地方(通常這個地方就是HDFS裡面,當然也可以是本地檔案系統)。