1. 程式人生 > >Spark-RDD特點及RDD運算元

Spark-RDD特點及RDD運算元

目錄

RDD

    RDD全稱是Resilient Distributed Dataset ,彈性分散式資料集

1.五個特性

A list of partitions
A function for computing each partition
A list of dependencies on other RDDs
Optionally, a Partitioner for key-value RDDs
Optionally, a list of preferred locations to compute each split on

RDD是由一系列的partition組成
RDD提供的每一個函式實際上作用在每一個partition上
RDD是有一系列的依賴關係,依賴於其他的RDD
可選項 分割槽器是作用在KV格式的RDD上的
可選項 RDD會提供一系列的最佳的計算位置

RDD依賴關係又被稱為血統(Lineage)

RDD五大特性的解析

Q:RDD依賴關係的作用
A:提高了計算的容錯性。因為RDD的依賴關係,子RDD知道父RDD是誰,但是父RDD不知道子RDD是誰。如果子RDD資料出錯或丟失,子RDD可以基於它的父RDD重新計算獲得資料。
Q:分割槽器的作用?
A:決定資料被哪一個reduce task處理
Q:KV格式的RDD是那種資料型別?
A:如果RDD中的資料是二元組型別的,那麼我們就稱這個RDD是KV格式的RDD。
Q:RDD如何提供最佳的計算位置?
A:RDD會提供一個方法介面,直接呼叫這個方法,就能拿到這個RDD的所有partition的位置。
Q:RDD儲存的是資料嗎?
A:這麼問的一看就是xx,肯定不是啊。RDD儲存的是計算邏輯,與資料庫中的檢視類似,只有觸發的時候才會處理資料。
Q:RDD是通過什麼從HDFS上讀取資料的?
A:RDD本身並沒有讀取資料的方法,RDD依賴的是MR讀檔案的方法,MR在讀檔案之前,會先將檔案劃分成一個個split。正常情況下,split與block存在下列關係:
       split size == block size
       block num ≈ split num
       split num == 第一個RDD的分割槽數
       如果某一個block檔案儲存的資料是上一個block塊最後一條資訊的半條資訊,會出現特殊情況,導致split num ≠ block num,大多數情況下split num == block num。
在這裡插入圖片描述

RDD運算元

    RDD中提供了大量的函式,我們也稱之為運算元。運算元分為兩大類:transformations(延遲執行)、action(觸發執行)

1.transformations類運算元

    transformation類運算元是懶執行的,當遇到action類運算元的時候才會觸發執行。
    transformation類運算元的一大特點是:運算元的返回值還是RDD型別的

運算元 使用 說明
map map (f: T => U) : RDD[U] 輸入一行輸出一行
flatmap flatMap(f: T =>TraversableOnce[U]): RDD[U] 將函式f作用在RDD中每個元素上,並展開(flatten),輸出的每個結果
mapPartitions mapPartitions[U](f: Iterator[Int] => Iterator[U], preservesPartitioning: Boolean): RDD[U] 獲 取 到 每 個 分 區 的 迭 代器,在 函 數 中 通 過 這 個 分 區 整 體 的 迭 代 器 對整 個 分 區 的 元 素 進 行 操 作
mapValues mapValues[U](f: Int => U): RDD[(String, U)] 針對(Key, Value)型資料中的 Value 進行 Map 操作
filter filter(f: T => Boolean):RDD[T] f定義了型別為T的元素是否留下,過濾輸入RDD中的元素,將f返回true的元素留下
coalesce coalesce(numPartitions: Int,b:boolean) : RDD[T] 重新分割槽,分割槽變多一定會發生shuffle
repartition repartition(numPartitions: Int) :RDD[T] 重新分割槽,是coalesce(numPartitons, true) 的簡寫
reduceByKey reduceByKey(func: (V, V) => V): RDD[(K, V)] reduceByKey()對key相同的value進行計算
groupByKey groupByKey(): RDD[(String, Iterable[Int])] 將RDD[key,value] 按照相同的key進行分組,形成RDD[key,Iterable[value]]的形式
union RDD.union(RDD) 將多個RDD合併為一個RDD
zip RDD.zip(RDD) 將兩個RDD組合成Key/Value形式的RDD,如果兩個rdd中的partition數量不一致,會報錯
zipWithUniqueId zipWithUniqueId(): RDD[(Int, Long)] 給RDD中的每一個元素加上一個唯一的索引號,非KV的RDD變成了KV格式的RDD
zipWitIndex zipWithIndex(): RDD[(Int, Long)] 給RDD中的每一個元素加上一個唯一的索引號,非KV的RDD變成了KV格式的RDD
join RDD.join(RDD) 返回兩個RDD根據K可以關聯上的結果,join只能用於兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可
distinct distinct() 將RDD中的元素進行去重操作
combineByKey combineByKey[C](createCombiner: Int => C, mergeValue: (C, Int) => C, mergeCombiners: (C, C) => C): RDD[(String, C)] 和reduceByKey是相同的效果,是reduceByKey的底層
sortByKey sortByKey(ascending: Boolean, numPartitions: Int): RDD[(String, Int)] 根據key來排序
sortBy sortBy(f: ((String, Int)) => K, ascending: Boolean, numPartitions: Int): RDD[(String, Int)] 指定根據哪一個欄位來排序

2.action類運算元

    當程式執行時,遇到action類運算元,會觸發執行,與前面的transformation類運算元一起執行。

運算元 使用 說明
collect collect(): Array[T] 將資料拉回到drive端,此運算元慎用!
take take(num: Int): Array[T] 返回RDD的前num個元素
count count(): Long 統計RDD中元素個數
foreach foreach(f: T => Unit):Unit 對RDD中每個元素,呼叫函式f
saveAsTextFile saveAsTextFile(path) 函式將資料輸出,儲存到指定目錄
reduce reduce(f: (T, T) => T): T 按照函式f對RDD中元素,進行規約

3.控制類運算元

    將資料落地,儲存再記憶體或磁碟上。

運算元 使用 說明
cache chche () 將RDD的資料持久化到記憶體中。cache是懶執行。相當於persist(StorageLevel.Memory_Only)
persist persist(level) 指定持久化級別,可選擇記憶體、磁碟、持久化、備份