1. 程式人生 > 其它 >Spark學習(四)RDD 概述

Spark學習(四)RDD 概述

spark 為了達到高併發,高吞吐資料處理能力封裝了三大資料物件;
  • RDD 彈性分散式資料集
  • 累加器:分散式共享只寫變數
  • 廣播變數:分散式共享只讀變數

1、RDD 概述

RDD(Resilient Distributed DateSet) 彈性分散式資料集,是 Spark 最基本的資料處理型別,在程式碼中以抽象類方式出現。它代表了 一個彈性,不可變,可分割槽,內部元素可並行執行的計算集合;

1.1、RDD 特點

儲存彈性,記憶體&磁碟自動切換; 容錯彈性:資料丟失容錯彈性 計算彈性:計算出錯重試機制 分片彈性:可根據需要重新分片 分散式 資料儲存在大資料平臺的不同節點上; 資料集:RDD 封裝了計算邏輯,不儲存資料; 資料抽象:RDD 是一個抽象類,需要子類具體實現; 不可變:RDD 的計算邏輯是不可變的,要想修改計算邏輯,需要建立新的RDD; 可分割槽:平行計算;

1.2、RDD 核心屬性

 * Internally, each RDD is characterized by five main properties:
 *
 *  - A list of partitions
 *  - A function for computing each split
 *  - A list of dependencies on other RDDs
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)

1、分割槽列表 RDD 資料結構存放在分割槽列表中,用於實現平行計算,是分散式計算的基礎屬性;

 /**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   
*/ protected def getPartitions: Array[Partition]

2、分割槽計算函式,spark 在計算時,使用分割槽計算函式對每一個分割槽進行計算

/**
 * :: DeveloperApi ::
 * Implemented by subclasses to compute a given partition.
 */
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
3、RDD 之間依賴關係 當計算需要多個計算模型時組合時,需要依賴關係將多個RDD建立依賴關係; 
/**
 * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
 * be called once, so it is safe to implement a time-consuming computation in it.
 */
protected def getDependencies: Seq[Dependency[_]] = deps
4、分割槽器 可選,當資料為 KV 資料時,可以設定分割槽自定義分割槽 
/**
 * Optionally overridden by subclasses to specify placement preferences.
 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
5、首選位置 可選 計算資料時,可通過不同節點的狀態決定計算節點位置 
/**
 * Optionally overridden by subclasses to specify placement preferences.
 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil

2、RDD 執行原理 

  從計算的角度來講,資料處理過程中需要計算資源(記憶體&CPU)和計算模型(邏輯)。執行時,需要將計算資源和計算模型進行協調和整合。Spark框架在執行時,先申請資源,然後將應用程式的資料處理邏輯分解成一個一個的計算任務。然後將任務發到已經分配資源的計算節點上,按照指定的計算模型進行資料計算。最後得到計算結果。RDD是Spark框架中用於資料處理的核心模型,接下來我們看看,在Yarn環境中,RDD的工作原理

 1、啟動 Yarn

 

 2、Spark 通過申請資源建立排程節點和計算節點

 

 

 3、spark 框架根據將計算邏輯根據計算需求分區劃分成不同的任務;

4、排程節點將任務根據計算節點狀態傳送到對應的計算節點進行計算

 

從以上流程可以看出RDD在整個流程中主要用於將邏輯進行封裝,並生成Task傳送給Executor節點執行計算,接下來我們就一起看看Spark框架中RDD是具體是如何進行資料處理的。

 3、RDD的建立

 1、讀取外部檔案建立RDD

啟動 spark-shell 

cd /opt/module/spark_local
[hui@hadoop103 spark_local]$ bin/spark-shell 

讀取外部檔案建立RDD

scala> val lines = sc.textFile("README.md")
scala> lines.count()
res1: Long = 103
scala> lines.first()
res2: String = # Apache Spark

這裡讀取了自帶的 一個 README.md 文件, lines.count() 返回了 文件裡的行數,lines.first() 返回了第一行

我們看下 sc 是什麼

scala> sc
res3: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2dcbf825
scala> lines
res4: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24

這裡的 sc 就是 spark-shell 啟動時建立的上下文環境物件。lines 是我們讀取檔案建立的RDD物件。

 IDEA 中讀取外部檔案建立RDD

object Spark01_RDD_file {
  def main(args: Array[String]): Unit = {
    //todo 準備環境
    val spekConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //[*] 表示當前系統最大可用CPU核數
    val sc = new SparkContext(spekConf)
    //todo 建立RDD
    //檔案中建立RDD,經檔案資料作為spark處理資料的源頭
    // path 路徑預設以當前環境的根路徑為基準,可用寫絕對路徑,也可以寫相對路徑
    // //val rdd: RDD[String] = sc.textFile("dates/1.txt")
    // path 可用是具體路徑,直接讀取路徑下的所有檔案
    //val rdd = sc.textFile("dates")
    // path 可用是具體路徑,+ 萬用字元
    val rdd = sc.textFile("dates/1*.txt")
    rdd.collect().foreach(println)
    sc.stop()
  }
}

2、讀取集合資料建立RDD

scala> val input = sc.parallelize(List(1,2,3,4,4))
input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val mapRDD = input.map(x=>x*2)
mapRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26

scala> println(mapRDD.collect().mkString(","))
2,4,6,8,8  

這裡我們通過一個簡單的list 來建立RDD,並呼叫了 map 方法對集合做了一個操作。

 IDEA 讀取集合資料建立RDD

object Spark01_RDD_Memory {
  def main(args: Array[String]): Unit = {
    //todo 準備環境
    val spekConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //[*] 表示當前系統最大可用CPU核數
    val sc = new SparkContext(spekConf)
    //todo 建立RDD
    //記憶體中建立RDD,經記憶體資料作為spark處理資料的源頭
    val seq = Seq[Int](1, 2, 3, 4)
    //parallelize 並行
    // val rdd = sc.parallelize(seq)
    // makeRDD 底層呼叫了 parallelize  方法
    val rd  = sc.makeRDD(seq)
 
   /*
    def makeRDD[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)  
   */
    rd.foreach(println) 
      /*
   def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}
   */
    sc.stop()//todo 關閉環境
  }
}