1. 程式人生 > >SparkRDD之彈性分布式數據集RDD

SparkRDD之彈性分布式數據集RDD

oop src 選擇 丟失 park 非循環 nal part 可用

2.RDD概述
2.1什麽是RDD
RDD(Resilient Distributed Dataset)叫做彈性分布式數據集,是Spark中最基本的數據抽象,它代表一個不可變、可分區、裏面的元素可並行計算的集合。RDD具有數據流模型的特點:自動容錯、位置感知性調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將數據緩存在內存中,後續的查詢能夠重用這些數據,這極大地提升了查詢速度。
Dataset:一個數據集合,用於存放數據的。
Distributed:RDD中的數據是分布式存儲的,可用於分布式計算。
Resilient:RDD中的數據可以存儲在內存中或者磁盤中。
2.2RDD的屬性
技術分享圖片
1) A list of partitions :一個分區(Partition)列表,數據集的基本組成單位。
對於RDD來說,每個分區都會被一個計算任務處理,並決定並行計算的粒度。用戶可以在創建RDD時指定RDD的分區個數,如果沒有指定,那麽就會采用默認值。(比如:讀取HDFS上數據文件產生的RDD分區數跟block的個數相等)
2)A function for computing each split :一個計算每個分區的函數。
Spark中RDD的計算是以分區為單位的,每個RDD都會實現compute函數以達到這個目的。
3)A list of dependencies on other RDDs:一個RDD會依賴於其他多個RDD,RDD之間的依賴關系。
RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似於流水線一樣的前後依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。
4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):一個Partitioner,即RDD的分區函數(可選項)。
當前Spark中實現了兩種類型的分區函數,一個是基於哈希的HashPartitioner,另外一 個是基於範圍的RangePartitioner。只有對於key-value的RDD,才會有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數決定了parent RDD Shuffle輸出時的分區數量。
5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file):一個列表,存儲每個Partition的優先位置(可選項)。
對於一個HDFS文件來說,這個列表保存的就是每個Partition所在的塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置(spark進行任務分配的時候盡可能選擇那些存有數據的worker節點來進行任務計算)。
2.3為什麽會產生RDD?
(1) 傳統的MapReduce雖然具有自動容錯、平衡負載和可拓展性的優點,但是其最大缺點是采用非循環式的數據流模型,使得在叠代計算中要進行大量的磁盤IO操作。RDD正是解決這一缺點的抽象方法。
(2) RDD是Spark提供的最重要的抽象的概念,它是一種具有容錯機制的特殊集合,可以分布在集群的節點上,以函數式編程來操作集合,進行各種並行操作。可以把RDD的結果數據進行緩存,方便進行多次重用,避免重復計算。
2.4RDD在Spark中的地位及作用
(1) 為什麽會有Spark?
因為傳統的並行計算模型無法有效的解決叠代計算(iterative)和交互式計算(interactive);而Spark的使命便是解決這兩個問題,這也是他存在的價值和理由。
(2) Spark如何解決叠代計算?
其主要實現思想就是RDD,把所有計算的數據保存在分布式的內存中。叠代計算通常情況下都是對同一個數據集做反復的叠代計算,數據在內存中將大大提升IO操作。這也是Spark涉及的核心:內存計算。
(3) Spark如何實現交互式計算?
因為Spark是用scala語言實現的,Spark和scala能夠緊密的集成,所以Spark可以完美的運用scala的解釋器,使得其中的scala可以向操作本地集合對象一樣輕松操作分布式數據集。
(4) Spark和RDD的關系?
RDD是一種具有容錯性、基於內存計算的抽象方法,RDD是Spark Core的底層核心,Spark則是這個抽象方法的實現。

  1. 創建RDD
    1)由一個已經存在的Scala集合創建。
    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
    2)由外部存儲系統的文件創建。包括本地的文件系統,還有所有Hadoop支持的數據集,比如HDFS、Cassandra、HBase等。
    val rdd2 = sc.textFile("/words.txt")
    3)已有的RDD經過算子轉換生成新的RDD
    val rdd3=rdd2.flatMap(_.split(" "))

SparkRDD之彈性分布式數據集RDD