1. 程式人生 > >Rdd詳解

Rdd詳解

RDD 詳解

1. RDD概述

RDD(Resilient Distributed Dataset)叫做分散式資料集,是Spark中最基本的資料抽象,它代表一個不可變、可分割槽、裡面的元素可平行計算的集合。RDD具有資料流模型的特點:自動容錯、位置感知性排程和可伸縮性。RDD允許使用者在執行多個查詢時顯式地將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度。

2. RDD屬性

2.1 分片: 即資料集的基本組成單位。對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度。使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值。預設值就是程式所分配到的CPU Core的數目。

2.2 函式: Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函式以達到這個目的。compute函式會對迭代器進行復合,不需要儲存每次計算的結果。

2.3 依賴關係: RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係。在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算。

2.4 RDD的分片函式(Partitioner):
當前Spark中實現了兩種型別的分片函式,一個是基於雜湊的HashPartitioner,另外一個是基於範圍的RangePartitioner。只有對於於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量。

3. RDD分割槽

分割槽是RDD內部平行計算的一個計算單元,RDD的資料集在邏輯上被劃分為多個分片,每一個分片稱為分割槽,分割槽的格式決定了平行計算的粒度,而每個分割槽的數值計算都是在一個任務中進行的,因此任務的個數,也是由RDD(準確來說是作業最後一個RDD)的分割槽數決定。

RDD分割槽的一個分割槽原則:儘可能是分割槽的個數等於叢集核數目

無論是本地模式、Standalone模式、YARN模式或Mesos模式,我們都可以通過spark.default.parallelism來配置其預設分割槽個數,若沒有設定該值,則根據不同的叢集環境確定該值,本地模式:預設為本地機器的CPU數目,若設定了local[N],則預設為N,Apache Mesos:預設的分割槽數為8,Standalone或YARN:預設取叢集中所有核心數目的總和,或者2,取二者的較大值。

4. RDD運算元

4.1 RDD中的所有轉換都是延遲載入的,也就是說,它們並不會直接計算結果。相反的,它們只是記住這些應用到基礎資料集(例如一個檔案)上的轉換動作。只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行。這種設計讓Spark更加有效率地執行。

4.2 常用的Transformation

RDD 建立
方式一:
val data = Array(3,2,54,62,4)
val rdd = sc.parallelize(data)
方式二:
val rdd1 = sc.textFile(“c://data.txt”) //指定本地目錄下檔案
val rdd1 = sc.textFile(“hdfs://master:9000/input/ data.txt”) //指定HDFS檔案
val rdd1 = sc.textFile("/input /*.txt") //含萬用字元的路徑

map(fun)
fun這個函式用於RDD中的每一個元素,將函式的返回結果作為結果RDD中對應元素

mapPartitions(fun)
rdd的mapPartitions是map的一個變種,它們都可進行分割槽的並行處理。兩者的主要區別是呼叫的粒度不一樣:map的輸入變換函式是應用於RDD中每個元素,而mapPartitions的輸入函式是應用於每個分割槽,假設一個rdd有10個元素,分成3個分割槽。如果使用map方法,map中的輸入函式會被呼叫10次;而使用mapPartitions方法的話,其輸入函式會只會被呼叫3次,每個分割槽呼叫1次在大資料集情況下的資源初始化開銷和批處理處理,都要初始化一個耗時的資源,然後使用,比如資料庫連線。mapPartitons的開銷要小很多,也便於進行批處理操作。

mapValues [Pair]相當於是map之對key-value 的value進行處理
mapValues 用於處理key-value型別的RDD,每次處理一個key-value對mapValues 處理的是key-value中的value,處理完value之後,
就會返回一個key-value型別的資料,返回到新RDD中去
val a = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”,“panther”, “eagle”,“cat”,“cat”), 2)
val b = a.map(x=>(x,x.length))
b.mapValues(x=>x+10).collect
res8: Array[(String, Int)] = Array((dog,13), (tiger,15), (lion,14), (cat,13), (panther,17), (eagle,15), (cat,13), (cat,13))

keys
keys 獲取RDD中元組的key,這些key可以重複出現
val a = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”,“cat”,“cat”), 2)
val b = a.map(x=>(x,x.length))
b.keys.collect
res9: Array[String] = Array(dog, tiger, lion, cat, panther, eagle, cat, cat)

filter(func)
將RDD滿足該函式的元素放入新的RDD中返回

**flatMap(func)**方法可以實現對每個輸入元素生成多個輸出元素,返回一個返回值序列的迭代器。其一個簡單用途就是把輸入的字串切分為單詞。
val lines=sc.parallelize(List(“hello word”,“hi”,“I’m back”))
val words=lines.flatMap(line=>line.split(" "))
words.collect

union(otherDataset)
會返回一個包含兩個RDD中所有元素的RDD,包含重複資料。
rdd1.union(rdd2)

distinct([numTasks]))
生成一個只包含不同元素的一個新的RDD。開銷很大
rdd1.diistinct

intersection(otherRDD) (注意全部是小寫)
只返回兩個RDD中都有的元素。可能會去掉所有的重複元素

subtract(otherRDD)
返回只存在第一個RDD中而不存在第二個RDD中的所有的元素組成的RDD。也需要網路混洗

join(otherDataset, [numTasks])

groupBy(fun)
groupBy 將RDD中的資料按照指定的函式和分割槽數量,來進行分組。
val a = sc.parallelize(1 to 9, 3)
//groupBy的第一個引數是一個函式,用於指定分組條件。分類標籤由條件返回值給定
//這裡會根據條件返回 “even” 和 “odd”
a.groupBy(x => { if (x % 2 == 0) “even” else “odd” }).collect
res1: Array((even,CompactBuffer(2, 8, 4, 6)), (odd,CompactBuffer(5, 1, 3, 7, 9)))

//這裡的返回標籤為 0 ,1 ,2
a.groupBy(x =>(x % 3)).collect
res2:Array((0,CompactBuffer(3, 9, 6)), (1,CompactBuffer(4, 1, 7)), (2,CompactBuffer(2, 8, 5)))
//groupBy中的第二個引數是指定,分組後將結果儲存在幾個分割槽中,預設分割槽數量和RDD元素分割槽數量相等
a.groupBy(x => myfunc(x), 3).collect
res2: Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))

groupByKey([numTasks])
groupByKey 和 groupBy 非常相似,不提供函式功能,只是按照key來進行分組,相同的key分在一組,相比於groupBy 要簡單
val a = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “spider”, “eagle”), 2)
//生成一個以單詞長度作為key,單詞作為value的 元組
val b = a.keyBy(_.length)
//groupByKey不提供函式功能,直接按照Key進行分類
b.groupByKey.collect
res1: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))

sortBy(func,[ascending], [numTasks])
對RDD進行排序
第一個引數是一個函式,該函式的也有一個帶T泛型的引數,返回型別和RDD中元素的型別是一致的;
第二個引數是ascending,從字面的意思大家應該可以猜到,是的,這引數決定排序後RDD中的元素是升序還是降序,預設是true,也就是升序;
第三個引數是numPartitions,該引數決定排序後的RDD的分割槽個數,預設排序後的分割槽個數和排序之前的個數相等,即為this.partitions.size。

sortByKey([ascending], [numTasks])
Key-Value形式的RDD,並對Key進行排序

4.3 Action

reduce(fun)
reduce將RDD中元素兩兩傳遞給輸入函式,同時產生一個新的值,新產生的值與RDD中下一個元素再被傳遞給輸入函式直到最後只有一個值為止。
val rdd=sc.parallelize(List(1,2,3,3))
val sum=rdd.reduce((x,y)=>x+y) 求和

collect()
一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組

reduceByKey(fun)
先根據相同的 key 進行分組,然後在相同的 key 中進行聚合,聚合形式和 reduce 相同
reduceByKey 先根據相同的 key 進行分組,然後在相同的 key 中進行聚合,聚合形式和 reduce 相同
val a = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”,“cat”,“cat”), 2)
val b = a.map(x=>(x,x.length))
b.reduceByKey(+).collect
res0: Array[(String, Int)] = Array((panther,7), (dog,3), (eagle,5), (lion,4), (cat,9), (tiger,5))

val rdd1 = sc.parallelize(List((“tom”, 1), (“jerry”, 3), (“kitty”, 2), (“shuke”, 1)))
val rdd2 = sc.parallelize(List((“jerry”, 2), (“tom”, 3), (“shuke”, 2), (“kitty”, 5)))
val rdd3 = rdd1.union(rdd2)
//按key進行聚合
val rdd4 = rdd3.reduceByKey(_ + _)
rdd4.collect
//按value的降序排序
val rdd5 = rdd4.map(t => (t._2, t._1)).sortByKey(false).map(t => (t._2, t._1))
rdd5.collect

count()
返回是dataset中的element的個數

first()
返回的是dataset的第一個元素

take(n)
返回前n個elements

saveAsTextFile(path)
把dataset寫到一個textfile中,或者hdfs

saveAsSequenceFile(path)
只能用在key-value對上,然後生成SequenceFile寫到本地或者hadoop檔案系統

saveAsObjectFile(path)
把dataset寫到一個java序列化的檔案中,用SparkContext。objectFile()裝載

countByKey()
返回的是key對應的個數的一個map,作用於RDD

foreach(func)
對dataset中的每個元素都使用func

max,min
max 返回RDD中最大/小的元素,如果是元組,那麼返回key最大/小的元素

5. RDD的依賴關係

5.1 RDD和它依賴的父RDD(s)的關係有兩種不同的型別,即窄依賴(narrow dependency)和寬依賴(wide dependency)
5.2 窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用
總結:窄依賴我們形象的比喻為獨生子女
5.3 寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition
總結:窄依賴我們形象的比喻為超生

6. RDD的快取

Spark速度非常快的原因之一,就是在不同操作中可以在記憶體中持久化或快取個數據集。當持久化某個RDD後,每一個節點都將把計算的分片結果儲存在記憶體中,並在對此RDD或衍生出的RDD進行的其他動作中重用。這使得後續的動作變得更加迅速。RDD相關的持久化和快取,是Spark最重要的特徵之一。可以說,快取是Spark構建迭代式演算法和快速互動式查詢的關鍵。

7. idea 建立scala工程在new的時候沒有scala class 解決方案"

1.ideal 開發scala,首先 要在setting裡面下載一個外掛”scala“

2.然後在project structure -->Global Libraries => +號 =>選擇本機scala的安裝路徑 =>apply=>ok

3.最後在建立檔案的時候, new後有了Scala class