Spark RDD使用詳解5--Action運算元
本質上在Actions運算元中通過SparkContext執行提交作業的runJob操作,觸發了RDD DAG的執行。
根據Action運算元的輸出空間將Action運算元進行分類:無輸出、 HDFS、 Scala集合和資料型別。
無輸出
foreach
對RDD中的每個元素都應用f函式操作,不返回RDD和Array,而是返回Uint。
圖中,foreach運算元通過使用者自定義函式對每個資料項進行操作。 本例中自定義函式為println,控制檯列印所有資料項。
原始碼:
|
HDFS
(1)saveAsTextFile
函式將資料輸出,儲存到HDFS的指定目錄。將RDD中的每個元素對映轉變為(Null,x.toString),然後再將其寫入HDFS。
圖中,左側的方框代表RDD分割槽,右側方框代表HDFS的Block。 通過函式將RDD的每個分割槽儲存為HDFS中的一個Block。
原始碼:
|
(2)saveAsObjectFile
saveAsObjectFile將分割槽中的每10個元素組成一個Array,然後將這個Array序列化,對映為(Null,BytesWritable(Y))的元素,寫入HDFS為SequenceFile的格式。
圖中,左側方框代表RDD分割槽,右側方框代表HDFS的Block。 通過函式將RDD的每個分割槽儲存為HDFS上的一個Block。
原始碼:
|
Scala集合和資料型別
(1)collect
collect相當於toArray,toArray已經過時不推薦使用,collect將分散式的RDD返回為一個單機的scala Array陣列。 在這個陣列上運用scala的函式式操作。
圖中,左側方框代表RDD分割槽,右側方框代表單機記憶體中的陣列。通過函式操作,將結果返回到Driver程式所在的節點,以陣列形式儲存。
原始碼:
|
(2)collectAsMap
collectAsMap對(K,V)型的RDD資料返回一個單機HashMap。對於重複K的RDD元素,後面的元素覆蓋前面的元素。
圖中,左側方框代表RDD分割槽,右側方框代表單機陣列。資料通過collectAsMap函式返回給Driver程式計算結果,結果以HashMap形式儲存。
原始碼:
|
(3)reduceByKeyLocally
實現的是先reduce再collectAsMap的功能,先對RDD的整體進行reduce操作,然後再收集所有結果返回為一個HashMap。
原始碼:
|
(4)lookup
Lookup函式對(Key,Value)型的RDD操作,返回指定Key對應的元素形成的Seq。這個函式處理優化的部分在於,如果這個RDD包含分割槽器,則只會對應處理K所在的分割槽,然後返回由(K,V)形成的Seq。如果RDD不包含分割槽器,則需要對全RDD元素進行暴力掃描處理,搜尋指定K對應的元素。
圖中,左側方框代表RDD分割槽,右側方框代表Seq,最後結果返回到Driver所在節點的應用中。
原始碼:
|
(5)count
count返回整個RDD的元素個數。
圖中,返回資料的個數為5。一個方塊代表一個RDD分割槽。
原始碼
|
(6)top
top可返回最大的k個元素。
相近函式說明:
- top返回最大的k個元素。
- take返回最小的k個元素。
- takeOrdered返回最小的k個元素, 並且在返回的陣列中保持元素的順序。
- first相當於top( 1) 返回整個RDD中的前k個元素, 可以定義排序的方式Ordering[T]。返回的是一個含前k個元素的陣列。
|
(7)reduce
reduce函式相當於對RDD中的元素進行reduceLeft函式的操作。
reduceLeft先對兩個元素
|
(8)fold
fold和reduce的原理相同,但是與reduce不同,相當於每個reduce時,迭代器取的第一個元素是zeroValue。
圖中,通過使用者自定義函式進行fold運算,圖中的一個方框代表一個RDD分割槽。
原始碼:
|
(9)aggregate
aggregate先對每個分割槽的所有元素進行aggregate操作,再對分割槽的結果進行fold操作。
aggreagate與fold和reduce的不同之處在於,aggregate相當於採用歸併的方式進行資料聚集,這種聚集是並行化的。 而在fold和reduce函式的運算過程中,每個分割槽中需要進行序列處理,每個分割槽序列計算完結果,結果再按之前的方式進行聚集,並返回最終聚集結果。
圖中,通過使用者自定義函式對RDD 進行aggregate的聚集操作,圖中的每個方框代表一個RDD分割槽。
原始碼:
|