Spark學習(四)RDD 概述
阿新 • • 發佈:2022-03-15
spark 為了達到高併發,高吞吐資料處理能力封裝了三大資料物件;
- RDD 彈性分散式資料集
- 累加器:分散式共享只寫變數
- 廣播變數:分散式共享只讀變數
1、RDD 概述
RDD(Resilient Distributed DateSet) 彈性分散式資料集,是 Spark 最基本的資料處理型別,在程式碼中以抽象類方式出現。它代表了 一個彈性,不可變,可分割槽,內部元素可並行執行的計算集合;1.1、RDD 特點
儲存彈性,記憶體&磁碟自動切換; 容錯彈性:資料丟失容錯彈性 計算彈性:計算出錯重試機制 分片彈性:可根據需要重新分片 分散式 資料儲存在大資料平臺的不同節點上; 資料集:RDD 封裝了計算邏輯,不儲存資料; 資料抽象:RDD 是一個抽象類,需要子類具體實現; 不可變:RDD 的計算邏輯是不可變的,要想修改計算邏輯,需要建立新的RDD; 可分割槽:平行計算;1.2、RDD 核心屬性
* Internally, each RDD is characterized by five main properties: * * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for* an HDFS file)
1、分割槽列表 RDD 資料結構存放在分割槽列表中,用於實現平行計算,是分散式計算的基礎屬性;
/** * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. * * The partitions in this array must satisfy the following property: * `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`*/ protected def getPartitions: Array[Partition]
2、分割槽計算函式,spark 在計算時,使用分割槽計算函式對每一個分割槽進行計算
/** * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. */ @DeveloperApi def compute(split: Partition, context: TaskContext): Iterator[T]3、RDD 之間依賴關係 當計算需要多個計算模型時組合時,需要依賴關係將多個RDD建立依賴關係;
/** * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ protected def getDependencies: Seq[Dependency[_]] = deps4、分割槽器 可選,當資料為 KV 資料時,可以設定分割槽自定義分割槽
/** * Optionally overridden by subclasses to specify placement preferences. */ protected def getPreferredLocations(split: Partition): Seq[String] = Nil /** Optionally overridden by subclasses to specify how they are partitioned. */ @transient val partitioner: Option[Partitioner] = None5、首選位置 可選 計算資料時,可通過不同節點的狀態決定計算節點位置
/** * Optionally overridden by subclasses to specify placement preferences. */ protected def getPreferredLocations(split: Partition): Seq[String] = Nil
2、RDD 執行原理
從計算的角度來講,資料處理過程中需要計算資源(記憶體&CPU)和計算模型(邏輯)。執行時,需要將計算資源和計算模型進行協調和整合。Spark框架在執行時,先申請資源,然後將應用程式的資料處理邏輯分解成一個一個的計算任務。然後將任務發到已經分配資源的計算節點上,按照指定的計算模型進行資料計算。最後得到計算結果。RDD是Spark框架中用於資料處理的核心模型,接下來我們看看,在Yarn環境中,RDD的工作原理1、啟動 Yarn
2、Spark 通過申請資源建立排程節點和計算節點
3、spark 框架根據將計算邏輯根據計算需求分區劃分成不同的任務;
4、排程節點將任務根據計算節點狀態傳送到對應的計算節點進行計算
從以上流程可以看出RDD在整個流程中主要用於將邏輯進行封裝,並生成Task傳送給Executor節點執行計算,接下來我們就一起看看Spark框架中RDD是具體是如何進行資料處理的。
3、RDD的建立
1、讀取外部檔案建立RDD
啟動 spark-shell
cd /opt/module/spark_local
[hui@hadoop103 spark_local]$ bin/spark-shell
讀取外部檔案建立RDD
scala> val lines = sc.textFile("README.md") scala> lines.count() res1: Long = 103 scala> lines.first() res2: String = # Apache Spark
這裡讀取了自帶的 一個 README.md 文件, lines.count() 返回了 文件裡的行數,lines.first() 返回了第一行
我們看下 sc 是什麼
scala> sc res3: org.apache.spark.SparkContext = org.apache.spark.SparkContext@2dcbf825 scala> lines res4: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24
這裡的 sc 就是 spark-shell 啟動時建立的上下文環境物件。lines 是我們讀取檔案建立的RDD物件。
IDEA 中讀取外部檔案建立RDD
object Spark01_RDD_file { def main(args: Array[String]): Unit = { //todo 準備環境 val spekConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //[*] 表示當前系統最大可用CPU核數 val sc = new SparkContext(spekConf) //todo 建立RDD //檔案中建立RDD,經檔案資料作為spark處理資料的源頭 // path 路徑預設以當前環境的根路徑為基準,可用寫絕對路徑,也可以寫相對路徑 // //val rdd: RDD[String] = sc.textFile("dates/1.txt") // path 可用是具體路徑,直接讀取路徑下的所有檔案 //val rdd = sc.textFile("dates") // path 可用是具體路徑,+ 萬用字元 val rdd = sc.textFile("dates/1*.txt") rdd.collect().foreach(println) sc.stop() } }
2、讀取集合資料建立RDD
scala> val input = sc.parallelize(List(1,2,3,4,4)) input: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24 scala> val mapRDD = input.map(x=>x*2) mapRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26 scala> println(mapRDD.collect().mkString(",")) 2,4,6,8,8
這裡我們通過一個簡單的list 來建立RDD,並呼叫了 map 方法對集合做了一個操作。
IDEA 讀取集合資料建立RDD
object Spark01_RDD_Memory { def main(args: Array[String]): Unit = { //todo 準備環境 val spekConf = new SparkConf().setMaster("local[*]").setAppName("RDD") //[*] 表示當前系統最大可用CPU核數 val sc = new SparkContext(spekConf) //todo 建立RDD //記憶體中建立RDD,經記憶體資料作為spark處理資料的源頭 val seq = Seq[Int](1, 2, 3, 4) //parallelize 並行 // val rdd = sc.parallelize(seq) // makeRDD 底層呼叫了 parallelize 方法 val rd = sc.makeRDD(seq) /* def makeRDD[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) */ rd.foreach(println) /* def collect(): Array[T] = withScope { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } */ sc.stop()//todo 關閉環境 } }