Spark-RDD特點及RDD運算元
目錄
RDD
RDD全稱是Resilient Distributed Dataset ,彈性分散式資料集
1.五個特性
A list of partitions A function for computing each partition A list of dependencies on other RDDs Optionally, a Partitioner for key-value RDDs Optionally, a list of preferred locations to compute each split on
RDD是由一系列的partition組成
RDD提供的每一個函式實際上作用在每一個partition上
RDD是有一系列的依賴關係,依賴於其他的RDD
可選項 分割槽器是作用在KV格式的RDD上的
可選項 RDD會提供一系列的最佳的計算位置
RDD依賴關係又被稱為血統(Lineage)
RDD五大特性的解析
Q:RDD依賴關係的作用
A:提高了計算的容錯性。因為RDD的依賴關係,子RDD知道父RDD是誰,但是父RDD不知道子RDD是誰。如果子RDD資料出錯或丟失,子RDD可以基於它的父RDD重新計算獲得資料。
Q:分割槽器的作用?
A:決定資料被哪一個reduce task處理
Q:KV格式的RDD是那種資料型別?
A:如果RDD中的資料是二元組型別的,那麼我們就稱這個RDD是KV格式的RDD。
Q:RDD如何提供最佳的計算位置?
A:RDD會提供一個方法介面,直接呼叫這個方法,就能拿到這個RDD的所有partition的位置。
Q:RDD儲存的是資料嗎?
A:這麼問的一看就是xx,肯定不是啊。RDD儲存的是計算邏輯,與資料庫中的檢視類似,只有觸發的時候才會處理資料。
Q:RDD是通過什麼從HDFS上讀取資料的?
A:RDD本身並沒有讀取資料的方法,RDD依賴的是MR讀檔案的方法,MR在讀檔案之前,會先將檔案劃分成一個個split。正常情況下,split與block存在下列關係:
split size == block size
block num ≈ split num
split num == 第一個RDD的分割槽數
如果某一個block檔案儲存的資料是上一個block塊最後一條資訊的半條資訊,會出現特殊情況,導致split num ≠ block num,大多數情況下split num == block num。
RDD運算元
RDD中提供了大量的函式,我們也稱之為運算元。運算元分為兩大類:transformations(延遲執行)、action(觸發執行)
1.transformations類運算元
transformation類運算元是懶執行的,當遇到action類運算元的時候才會觸發執行。
transformation類運算元的一大特點是:運算元的返回值還是RDD型別的
運算元 | 使用 | 說明 |
---|---|---|
map | map (f: T => U) : RDD[U] | 輸入一行輸出一行 |
flatmap | flatMap(f: T =>TraversableOnce[U]): RDD[U] | 將函式f作用在RDD中每個元素上,並展開(flatten),輸出的每個結果 |
mapPartitions | mapPartitions[U](f: Iterator[Int] => Iterator[U], preservesPartitioning: Boolean): RDD[U] | 獲 取 到 每 個 分 區 的 迭 代器,在 函 數 中 通 過 這 個 分 區 整 體 的 迭 代 器 對整 個 分 區 的 元 素 進 行 操 作 |
mapValues | mapValues[U](f: Int => U): RDD[(String, U)] | 針對(Key, Value)型資料中的 Value 進行 Map 操作 |
filter | filter(f: T => Boolean):RDD[T] | f定義了型別為T的元素是否留下,過濾輸入RDD中的元素,將f返回true的元素留下 |
coalesce | coalesce(numPartitions: Int,b:boolean) : RDD[T] | 重新分割槽,分割槽變多一定會發生shuffle |
repartition | repartition(numPartitions: Int) :RDD[T] | 重新分割槽,是coalesce(numPartitons, true) 的簡寫 |
reduceByKey | reduceByKey(func: (V, V) => V): RDD[(K, V)] | reduceByKey()對key相同的value進行計算 |
groupByKey | groupByKey(): RDD[(String, Iterable[Int])] | 將RDD[key,value] 按照相同的key進行分組,形成RDD[key,Iterable[value]]的形式 |
union | RDD.union(RDD) | 將多個RDD合併為一個RDD |
zip | RDD.zip(RDD) | 將兩個RDD組合成Key/Value形式的RDD,如果兩個rdd中的partition數量不一致,會報錯 |
zipWithUniqueId | zipWithUniqueId(): RDD[(Int, Long)] | 給RDD中的每一個元素加上一個唯一的索引號,非KV的RDD變成了KV格式的RDD |
zipWitIndex | zipWithIndex(): RDD[(Int, Long)] | 給RDD中的每一個元素加上一個唯一的索引號,非KV的RDD變成了KV格式的RDD |
join | RDD.join(RDD) | 返回兩個RDD根據K可以關聯上的結果,join只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可 |
distinct | distinct() | 將RDD中的元素進行去重操作 |
combineByKey | combineByKey[C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C): RDD[(String, C)] | 和reduceByKey是相同的效果,是reduceByKey的底層 |
sortByKey | sortByKey(ascending: Boolean, numPartitions: Int): RDD[(String, Int)] | 根據key來排序 |
sortBy | sortBy(f: ((String, Int)) => K, ascending: Boolean, numPartitions: Int): RDD[(String, Int)] | 指定根據哪一個欄位來排序 |
2.action類運算元
當程式執行時,遇到action類運算元,會觸發執行,與前面的transformation類運算元一起執行。
運算元 | 使用 | 說明 |
---|---|---|
collect | collect(): Array[T] | 將資料拉回到drive端,此運算元慎用! |
take | take(num: Int): Array[T] | 返回RDD的前num個元素 |
count | count(): Long | 統計RDD中元素個數 |
foreach | foreach(f: T => Unit):Unit | 對RDD中每個元素,呼叫函式f |
saveAsTextFile | saveAsTextFile(path) | 函式將資料輸出,儲存到指定目錄 |
reduce | reduce(f: (T, T) => T): T | 按照函式f對RDD中元素,進行規約 |
3.控制類運算元
將資料落地,儲存再記憶體或磁碟上。
運算元 | 使用 | 說明 |
---|---|---|
cache | chche () | 將RDD的資料持久化到記憶體中。cache是懶執行。相當於persist(StorageLevel.Memory_Only) |
persist | persist(level) | 指定持久化級別,可選擇記憶體、磁碟、持久化、備份 |