spark core之RDD編程
在spark中,對數據的所有操作不外乎創建RDD、轉化已有RDD及調用RDD操作進行求值。spark會自動地將RDD中的數據分發到集群中並行執行。
五大特性
- a list of partitions
RDD是一個由多個partition(某個節點裏的某一片連續的數據)組成的的list;將數據加載為RDD時,一般會遵循數據的本地性(一般一個hdfs裏的block會加載為一個partition)。 - a function for computing each split
RDD的每個partition中都會有function,即函數應用,其作用是實現RDD之間partition的轉換。 - a list of dependencies on other RDDs
RDD會記錄它的依賴,為了容錯(重算,cache,checkpoint),即內存中的RDD操作出錯或丟失時會進行重算。 - Optionally,a Partitioner for Key-value RDDs
可選項,如果RDD裏面存的數據是key-value形式,則可以傳遞一個自定義的Partitioner進行重新分區,例如自定義的Partitioner是基於key進行分區,那則會將不同RDD裏面的相同key的數據放到同一個partition裏面。 - Optionally, a list of preferred locations to compute each split on
可選項,最優的位置去計算每個分片,即數據的本地性。創建RDD
spark提供了兩種創建RDD的方式:讀取外部數據源、將驅動器程序中的集合進行並行化。
並行化集合
使用sparkContext的parallelize()方法將集合並行化。
parallelize()方法第二個參數可指定分區數。spark會為每個分區創建一個task任務,通常每個cpu需要2-4個分區。spark會自動地根據集群大小設置分區數,也支持通過parallelize()方法的第二個參數手動指定。scala
val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data)
java
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> distData = sc.parallelize(data);
python
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)
註:除了開發和測試外,這種方式用得不多。這種方式需要把整個數據集先放到一臺機器的內存中。
讀取外部數據源
spark可接入多種hadoop支持的數據源來創建分布式數據集。包括:本地文件系統、HDFS、Cassandra、HBase、Amazon S3等。
spark支持多種存儲格式,包括textFiles、SequenceFiles及其他hadoop存儲格式。scala
scala> val distFile = sc.textFile("data.txt") distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
java
JavaRDD<String> distFile = sc.textFile("data.txt");
python
>>> distFile = sc.textFile("data.txt")
RDD操作
RDD支持兩種操作:轉化操作和行動操作。
轉化操作
RDD的轉化操作會返回一個新的RDD。轉化操作是惰性求值的,只有行動操作用到轉化操作生成的RDD時,才會真正進行轉化。
spark使用lineage(血統)來記錄轉化操作生成的不同RDD之間的依賴關系。依賴分為窄依賴(narrow dependencies)和寬依賴(wide dependencies)。
- 窄依賴
- 子RDD的每個分區依賴於常數個父分區
- 輸入輸出一對一,結果RDD的分區結構不變,主要是map、flatMap
- 輸入輸出一對一,但結果RDD的分區結構發生變化,如union、coalesce
- 從輸入中選擇部分元素的算子,如filter、distinct、subtract、sample
-
寬依賴
- 子RDD的每個分區依賴於所有父RDD分區
- 對單個RDD基於key進行重組和reduce,如groupByKey、reduceByKey
-
對兩個RDD基於key進行合並和重組,如join
行動操作
行動操作則會向驅動器程序返回結果或把結果寫入外部系統,會觸發實際的計算。
緩存方式
RDD通過persist方法或cache方法可以將前面的計算結果緩存,但是並不是這兩個方法被調用時立即緩存,而是觸發後面的action時,該RDD將會被緩存在計算節點的內存中,並供後面重用。
cache最終也是調用了persist方法,默認的存儲級別是僅在內存存儲一份。
Spark的存儲級別還有好多種,存儲級別在object StorageLevel中定義的。
緩存有可能丟失,RDD的緩存容錯機制保證即使緩存丟失也能保證計算正確執行。通過基於RDD的一系列轉換,丟失的數據會被重算,由於RDD的各個Partition是相對獨立的,因此只需要計算丟失的部分即可,並不需要重算全部Partition。容錯機制
-
Lineage機制
-
RDD的Lineage記錄的是粗粒度的特定數據Transformation操作行為。當RDD的部分分區數據丟失時,可以通過Lineage來重新運算和恢復丟失的數據分區。這種粗顆粒的數據模型,限制了Spark的運用場合,所以Spark並不適用於所有高性能要求的場景,但同時相比細顆粒度的數據模型,也帶來了性能的提升。
-
Spark Lineage機制是通過RDD的依賴關系來執行的
-
窄依賴可以在某個計算節點上直接通過計算父RDD的某塊數據計算得到子RDD對應的某塊數據。
- 寬依賴則要等到父RDD所有數據都計算完成後,將父RDD的計算結果進行hash並傳到對應節點上之後才能計算子RDD。寬依賴要將祖先RDD中的所有數據塊全部重新計算,所以在長“血統”鏈特別是有寬依賴的時候,需要在適當的時機設置數據檢查點。
-
-
-
Checkpoint機制
-
簡介
- 當RDD的action算子觸發計算結束後會執行checkpoint;Task計算失敗的時候會從checkpoint讀取數據進行計算。
-
實現方式(checkpoint有兩種實現方式,如果代碼中沒有設置checkpoint,則使用local的checkpoint模式,如果設置路徑,則使用reliable的checkpoint模式。)
-
LocalRDDCheckpointData:臨時存儲在本地executor的磁盤和內存上。該實現的特點是比較快,適合lineage信息需要經常被刪除的場景(如GraphX),可容忍executor掛掉。
- ReliableRDDCheckpointData:存儲在外部可靠存儲(如hdfs),可以達到容忍driver 掛掉情況。雖然效率沒有存儲本地高,但是容錯級別最好。
-
-
-
spark core之RDD編程