1. 程式人生 > >Spark RDD詳解

Spark RDD詳解

Spark RDD 大數據 大數據開發

1、RDD是什麽
RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一個只讀的,可分區的分布式數據集,這個數據集的全部或部分可以緩存在內存中,在多次計算間重用。

為什麽會產生RDD?

(1)傳統的MapReduce雖然具有自動容錯、平衡負載和可拓展性的優點,但是其最大缺點是采用非循環式的數據流模型,使得在叠代計算式要進行大量的磁盤IO操作。RDD正是解決這一缺點的抽象方法

(2)RDD的具體描述RDD(彈性數據集)是Spark提供的最重要的抽象的概念,它是一種有容錯機制的特殊集合,可以分布在集群的節點上,以函數式編 操作集合的方式,進行各種並行操作。可以將RDD理解為一個具有容錯機制的特殊集合,它提供了一種只讀、只能有已存在的RDD變換而來的共享內存,然後將 所有數據都加載到內存中,方便進行多次重用。a.他是分布式的,可以分布在多臺機器上,進行計算。b.他是彈性的,計算過程中內錯不夠時它會和磁盤進行數 據交換。c.這些限制可以極大的降低自動容錯開銷d.實質是一種更為通用的叠代並行計算框架,用戶可以顯示的控制計算的中間結果,然後將其自由運用於之後 的計算。

(3)RDD的容錯機制實現分布式數據集容錯方法有兩種:數據檢查點和記錄更新RDD采用記錄更新的方式:記錄所有更新點的成本很高。所以,RDD只支持 粗顆粒變換,即只記錄單個塊上執行的單個操作,然後創建某個RDD的變換序列(血統)存儲下來;變換序列指,每個RDD都包含了他是如何由其他RDD變換 過來的以及如何重建某一塊數據的信息。因此RDD的容錯機制又稱“血統”容錯。 要實現這種“血統”容錯機制,最大的難題就是如何表達父RDD和子RDD之間的依賴關系。實際上依賴關系可以分兩種,窄依賴和寬依賴:窄依賴:子RDD中 的每個數據塊只依賴於父RDD中對應的有限個固定的數據塊;寬依賴:子RDD中的一個數據塊可以依賴於父RDD中的所有數據塊。例如:map變換,子 RDD中的數據塊只依賴於父RDD中對應的一個數據塊;groupByKey變換,子RDD中的數據塊會依賴於多有父RDD中的數據塊,因為一個key可 能錯在於父RDD的任何一個數據塊中 將依賴關系分類的兩個特性:第一,窄依賴可以在某個計算節點上直接通過計算父RDD的某塊數據計算得到子RDD對應的某塊數據;寬依賴則要等到父RDD所 有數據都計算完成之後,並且父RDD的計算結果進行hash並傳到對應節點上之後才能計算子RDD。第二,數據丟失時,對於窄依賴只需要重新計算丟失的那 一塊數據來恢復;對於寬依賴則要將祖先RDD中的所有數據塊全部重新計算來恢復。所以在長“血統”鏈特別是有寬依賴的時候,需要在適當的時機設置數據檢查 點。也是這兩個特性要求對於不同依賴關系要采取不同的任務調度機制和容錯恢復機制。

(4)RDD內部的設計每個RDD都需要包含以下四個部分:a.源數據分割後的數據塊,源代碼中的splits變量b.關於“血統”的信息,源碼中的 dependencies變量c.一個計算函數(該RDD如何通過父RDD計算得到),源碼中的iterator(split)和compute函數d. 一些關於如何分塊和數據存放位置的元信息,如源碼中的partitioner和preferredLocations例如:a.一個從分布式文件系統中的 文件得到的RDD具有的數據塊通過切分各個文件得到的,它是沒有父RDD的,它的計算函數知識讀取文件的每一行並作為一個元素返回給RDD;b.對與一個 通過map函數得到的RDD,它會具有和父RDD相同的數據塊,它的計算函數式對每個父RDD中的元素所執行的一個函數

2、RDD在Spark中的地位及作用

(1)為什麽會有Spark?因為傳統的並行計算模型無法有效的解決叠代計算(iterative)和交互式計算(interactive);而Spark的使命便是解決這兩個問題,這也是他存在的價值和理由。

(2)Spark如何解決叠代計算?其主要實現思想就是RDD,把所有計算的數據保存在分布式的內存中。叠代計算通常情況下都是對同一個數據集做反復的叠代計算,數據在內存中將大大提升IO操作。這也是Spark涉及的核心:內存計算。

(3)Spark如何實現交互式計算?因為Spark是用scala語言實現的,Spark和scala能夠緊密的集成,所以Spark可以完美的運用scala的解釋器,使得其中的scala可以向操作本地集合對象一樣輕松操作分布式數據集。

(4)Spark和RDD的關系?可以理解為:RDD是一種具有容錯性基於內存的集群計算抽象方法,Spark則是這個抽象方法的實現。

3、如何操作RDD?

(1)如何獲取RDDa.從共享的文件系統獲取,(如:HDFS)b.通過已存在的RDD轉換c.將已存在scala集合(只要是Seq對象)並行化 ,通過調用SparkContext的parallelize方法實現d.改變現有RDD的之久性;RDD是懶散,短暫的。(RDD的固化:cache緩 存至內錯; save保存到分布式文件系統)

(2)操作RDD的兩個動作a.Actions:對數據集計算後返回一個數值value給驅動程序;例如:Reduce將數據集的所有元素用某個函數聚合 後,將最終結果返回給程序。b.Transformation:根據數據集創建一個新的數據集,計算後返回一個新RDD;例如:Map將數據的每個元素經 過某個函數計算後,返回一個姓的分布式數據集。

(3)Actions具體內容:

reduce(func) 通過函數func聚集數據集中的所有元素。Func函數接受2個參數,返回一個值。這個函數必須是關聯性的,確保可以被正確的並發執行
collect() 在Driver的程序中,以數組的形式,返回數據集的所有元素。這通常會在使用filter或者其它操作後,返回一個足夠小的數據子集再使用,直接將整個RDD集Collect返回,很可能會讓Driver程序OOM
count() 返回數據集的元素個數
take(n) 返回一個數組,由數據集的前n個元素組成。註意,這個操作目前並非在多個節點上,並行執行,而是Driver程序所在機器,單機計算所有的元素(Gateway的內存壓力會增大,需要謹慎使用)
first() 返回數據集的第一個元素(類似於take(1)
saveAsTextFile(path) 將數據集的元素,以textfile的形式,保存到本地文件系統,hdfs或者任何其它hadoop支持的文件系統。Spark將會調用每個元素的toString方法,並將它轉換為文件中的一行文本
saveAsSequenceFile(path) 將 數據集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支持的文件系統。RDD的元素必須由 key-value對組成,並都實現了Hadoop的Writable接口,或隱式可以轉換為Writable(Spark包括了基本類型的轉換,例如 Int,Double,String等等)
foreach(func) 在數據集的每一個元素上,運行函數func。這通常用於更新一個累加器變量,或者和外部存儲系統做交互

(4)Transformation具體內容

map(func)返回一個新的分布式數據集,由每個原元素經過func函數轉換後組成

filter(func)返回一個新的數據集,由經過func函數後返回值為true的原元素組成

flatMap(func)類似於map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素)

flatMap(func)類似於map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素)

sample(withReplacement, frac, seed)根據給定的隨機種子seed,隨機抽樣出數量為frac的數據

union(otherDataset)返回一個新的數據集,由原數據集和參數聯合而成

groupByKey([numTasks])在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。註意:默認情況下,使用8個並行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的TaskreduceByKey(func, [numTasks])

在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。

join(otherDataset, [numTasks])在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集

groupWith(otherDataset, [numTasks])在類型為(K,V)和(K,W)類型的數據集上調用,返回一個數據集,組成元素為(K, Seq[V], Seq[W]) Tuples。這個操作在其它框架,稱為CoGroup

cartesian(otherDataset)笛卡爾積。但在數據集T和U上調用時,返回一個(T,U)對的數據集,所有元素交互進行笛卡爾積。

flatMap(func)類似於map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素)

Spark RDD詳解