1. 程式人生 > >彈性分散式資料集RDD(概念)

彈性分散式資料集RDD(概念)

RDD概述:
RDD(Resilient DistributedDataset)叫做分散式資料集
是Spark中最基本的資料抽象,它代表一個不可變,可分割槽,裡面的元素可平行計算的集合
RDD具有資料流模型的特點:自動容錯;位置感知性排程和可伸縮性.
RDD允許使用者在執行多個查詢時顯式地將工作集快取在記憶體中,後續的查詢能夠重用工作集,這極大地提升了查詢速度

RDD的屬性:
1.一組分片(Partition),即資料集的基本組成單位.對於RDD來說,每個分片都會被一個計算任務處理,並決定平行計算的粒度.使用者可以在建立RDD時指定RDD的分片個數,如果沒有指定,那麼就會採用預設值.預設值就是程式所分配到的CPU Core的數目

2.一個計算每個分割槽的函式.Spark中RDD的計算是以分片為單位的,每個RDD都會實現compute函式以達到這個目的.compute函式會對迭代器進行復合,不需要儲存每次計算的結果

3.RDD之間的依賴關係.RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關係.在部分分割槽資料丟失時,Spark可以通過這個依賴關係重新計算丟失的分割槽資料,而不是對RDD的所有分割槽進行重新計算

4.一個Partitioner,即RDD的分片函式;當前Spark中實現了兩種型別的分片函式,一個是基於雜湊的HashPartitioner,另一個是基於範圍的RangePartitioner.只有對於key-value的RDD,才會有Partitioner,非key-value的RDD的Partitioner的值是None;Partitioner函式不但決定了RDD本身的分片數量,也決定了parent RDD Shuffle輸出時的分片數量

5.一個列表,儲存存取每個Partitioner的優先位置(Preferred location).對於一個HDFS檔案來說,這個列表儲存的就是每個Partitioner所在的塊的位置;按照”移動資料不如移動計算”的理念,Spark在進行任務排程的時候,會盡可能地將計算任務分配到其所要處理資料塊的儲存位置

建立RDD
1.由一個已經存在的Scala集合建立
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8,9))

2.由外部儲存系統的資料集建立,包括本地的檔案系統,還有所有Hadoop支援的資料集
val rdd2 = sc.textFile(“hdfs://minimaster:9000/words”)

RDD程式設計API
Transformation
RDD中的所有轉換都是延遲載入,也就是說,它們並不會直接計算結果;它們只是記住這些應用到基礎資料集(一個檔案)上的轉換動作.只有當發生一個要求返回結果給Driver的動作時,這些轉換才會真正執行

常用的Transformation:
mapPartitions(func): 獨立地在RDD事務每一個分片上執行,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func): 類似於mapPartitions,但func帶有一個整數引數表示分片的索引值,因此在型別為T的RDD上執行時,func的函式型別必須是Iterator[T] => Iterator[U]

寬依賴與窄依賴的誤區??
join間的寬依賴與窄依賴

RDD的快取
Spark速度非常快的原因:在不同操作中可以在記憶體中持久化或快取資料集.當持久化某個RDD後,每一個節點都將把計算的分片結果儲存在記憶體中,並在對此RDD或衍生出的RDD進行的其他動作中重用.這使得後續的動作變得更加迅速.RDD相關的持久化和快取,是Spark最重要的特徵之一;可以說,快取是Spark構建迭代式演算法和快速互動式查詢的關鍵

將經常用到的結果資料或者shuffle以後的資料往往先快取起來:
1.便於以後快捷訪問
2.提高結果資料的安全性

RDD快取方式:
RDD通過persist方法或cache方法可以將前面的計算結果快取,但是並不是這兩個方法被呼叫時立即快取,而是觸發後面的action時,該RDD將會被快取在計算節點的記憶體中,並供後面重用
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()
通過檢視原始碼發現cache最終也是呼叫了persist方法,預設的儲存級別都是僅在記憶體儲存一份
快取有可能丟失,或者儲存於記憶體的資料由於記憶體不足而被刪除,RDD的快取容錯機制保證了即使快取丟失也能保證計算的正確執行.通過基於RDD的一系列轉換,丟失的資料會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition

DAG的生成
DAG(Directed Acyclic Graph)叫做有向無環圖,原始的RDD通過一系列的轉換就形成了DGA,根據RDD之間的依賴關係的不同將DGA劃分成不同的Stage,對於窄依賴,partition的轉換處理在Stage中完成計算;對於寬依賴,由於有Shuffle的存在,只能在parent RDD處理完成後,才能開始接下來的計算,因此寬依賴是劃分Stage的依據
DAG的生成與Stage的劃分

Stage劃分:
為什麼劃分Stage??
因為要把RDD最終生成一個個的task提交到Executor執行,所以需要把RDD先stage劃分再生成task
Stage劃分
stage劃分的依據??
檢視父RDD是否發生了寬依賴,然後通過遞迴,從最後一個Action型別的RDD開始,從後往前推,當遇到發生寬依賴的RDD,就把前面所有的RDD劃分一個Stage,最後所有的RDD再劃分一個stage,直到所有的RDD都劃分完

RDD的生成,stage的劃分,任務的生成,任務的提交這4個階段
4個階段

Yarn和Spark的提交任務:
ResourceManager相當於Master,負責任務排程
NodeManager相當於Worker,負責建立一個容器和啟動自己的子程序(YarnChild和Executor)
Client相當於Driver,用來提交任務
YarnChild:相當於Executor,直接參與計算的程序
Yarn和Spark

Accumulator(累加器)
Spark提供的Accumulator,主要用於多個節點對一個變數進行共享操作
他提供了多個task對一個變數並行的操作的功能
task只能對Accumulator進行累加的操作,不能讀取其值
只有Driver程式可以讀取其值

Task的生成依據Stage的劃分,在Stage中會首先劃分pipeline,然後根據pipeline生成Task

shuffle Read: 把父RDD的資料讀取到子RDD中,發生在Shuffle之後

Shuffle Write: 把中間結果資料寫到磁碟,為了保證資料的安全性

為什麼Shuffle Write到磁碟而不是記憶體?
1.避免因為結果資料太大而佔用太多的記憶體資源,造成記憶體溢位
2.儲存到磁碟可以保證資料的安全性

取消cache() → unpersist()

為什麼要checkpoint??
執行處的中間結果往往很重要,所以為了保證資料的安全性,要把資料做檢查點,最好把資料checkpoint到HDFS,便於該叢集所有節點訪問到
在checkpoint之前最好先cache一下,就是先把資料快取到記憶體,這樣便於執行任務時呼叫,也便於在checkpoint的時候直接從快取獲取資料

在什麼時候做checkpoint??
在發生shuffle之後做checkpoint;

checkpoint的步驟:
1.建立checkpoint儲存目錄
sc.setCheckpointDir(“hdfs://…..”)
2.把資料cache起來
rdd.cache()
3.checkpoint
rdd.checkpoint()
rdd.collect