Spark核心:RDD的運算元
RDD的運算元
一般來說,RDD的整個計算過程都是發生在Worker節點中的Executor中的。RDD可以支援三種操作型別:Transformation、Action以及Persist和CheckPoint為代表的控制型別操作。
RDD一般是從外部資料來源讀取資料的,經過多次的Transformation(中間應該有Persist和CheckPoint操作),最終通過Action型別的操作將結果寫入到外部儲存系統。在Spark語言裡,CheckPoint用過將RDD寫入Disk做檢查點,是Spark Lineage容錯的輔助,但是Lineage過長的話會讓容錯的成本變高,這時候在中間階段做檢查點容錯,如果之後的所有節點出現問題,可以從檢查點的RDD開始重新做Lineage,這樣可以減少開銷。
CheckPoint主要適用兩種情況:
- DAG中的Lineage過長,如果重新算的話開銷非常大,例如PageRank、ALS等
適合與在寬依賴上做CheckPoint,這樣可以避免為Lineage重新計算而帶來的冗餘計算
預設情況下,每個Transformation過的RDD會在每次Action的時候重新計算一次,然而可以視同Persist(Cache)持久化一個RDD到記憶體中,進行復用。
RDD的三種操作:
- Transformation ——惰性執行
- Action —— 觸發作業
- 操作 持久化
Transformation
Transformation是一種演算法的描述。
標記著需要進行操作的資料,但不是真的進行執行。
Transformation具有Lazy特性,操作是延遲計算的。也就是說一個RDD轉換成另一個RDD的操作不是馬上執行的,需要等到Actions或者CheckPoint時,才會真的觸發操作。
Spark會從後往前順著Lineage回溯並劃分Stage,檢視那些RDD觸發了計算,從後往前推,這就是Transformation的Lazy特性,這樣一個特性有一個很大的好處就是不需要結果是就不讓這個作業進行計算,在需要時進行計算,這樣可以避免很多不必要的中間臨時資料,這比較符合分散式的平行計算需求。
從另一個層面來講,最後一步計算時候,可以看到前面所有步驟,看見的步驟越多,進行優化的機會也就越多,所以Spark是基於Lazy特性進行操作的,基於Lineage來構建整個排程系統,最終形成了DAG。
一些Transformation操作
從HDFS中建立RDD
從記憶體中建立RDD
先定義一個數組
通過parallelize方法建立ParallelCollectionRDD
從其他RDD建立新的RDD
filter函式將distData RDD轉換成新的RDD
出發action操作,檢視過濾後的內容
注意collection只適合資料量少的時候用
注意,Transformation之後不會馬上進行操作,要知道出發action才會執行
例如distData.filter(e=>e>2) transformation後,它不會立即執行,合適等到distDataFilter.collect方法執行時才被執行
Action
reduce
count()
first()
take(n)
takeSample()
takeOrdered()
savaAsTextFile()
等操作都是action操作,會實際執行的。