第五章_Spark核心程式設計_Rdd_血緣關係
阿新 • • 發佈:2022-04-01
1. RDD 血緣關係
/*RDD 血緣關係*/ /* * 1. 什麼是Rdd的血緣關係? * 1.RDD 只支援粗粒度轉換,即在大量記錄上執行的單個操作。 * 2.將建立 RDD 的一系列 Lineage (血統)記錄下來,以便恢復丟失的分割槽。 * 3.RDD的 Lineage 會記錄RDD的 元資料資訊和轉換行為 * 當該RDD的部分分割槽資料丟失時,它可以根據這些資訊來重新運算和恢復丟失的資料分割槽 * * 2. 怎樣檢視 Rdd的血緣關係? * rdd.toDebugString * */
2. RDD 依賴關係
/*RDD 依賴關係*/ /* * 1. 什麼是Rdd的依賴關係? * 當前Rdd和父Rdd的依賴關係 * 2. 怎樣檢視 Rdd的依賴關係? * rdd.dependencies * */
3. RDD 窄依賴&寬依賴
/*RDD 窄依賴&寬依賴*/ /* * 1.什麼是窄依賴? * 當前Rdd的1個分割槽 最多依賴父Rdd的一個分割槽 * 沒有Shuffle過程,例如map、flatmap * * 2.什麼是寬依賴? * 當前Rdd的1個分割槽 依賴父Rdd的多個分割槽資料 * 有SHuffle過程,例如groupBy **/
4. 示例
//檢視 Rdd的血緣關係 object FindLineAge extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) private val rdd: RDD[String] = sc.textFile("Spark_319/src/data/*.txt") private val rdd1: RDD[String] = rdd.flatMap(_.split(" "))private val rdd2: RDD[(String, Iterable[String])] = rdd1.groupBy(e => e) private val rdd3: RDD[(String, Int)] = rdd2.map(tp => (tp._1, tp._2.size)) println("****rdd*********************") println(rdd.toDebugString) println("****rdd1*********************") println(rdd1.toDebugString) println("*****rdd2********************") println(rdd2.toDebugString) println("*****rdd3********************") println(rdd3.toDebugString) rdd3.collect().foreach(println(_)) sc.stop() } //檢視 Rdd的依賴關係 object Finddepend extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) private val rdd: RDD[String] = sc.textFile("Spark_319/src/data/*.txt") private val rdd1: RDD[String] = rdd.flatMap(_.split(" ")) private val rdd2: RDD[(String, Iterable[String])] = rdd1.groupBy(e => e) private val rdd3: RDD[(String, Int)] = rdd2.map(tp => (tp._1, tp._2.size)) println("****rdd*********************") println(rdd.dependencies) println("****rdd1*********************") println(rdd1.dependencies) println("*****rdd2********************") println(rdd2.dependencies) println("*****rdd3********************") println(rdd3.dependencies) rdd3.collect().foreach(println(_)) sc.stop() }
****rdd********************* (2) Spark_319/src/data/*.txt MapPartitionsRDD[1] at textFile at 血緣關係.scala:54 [] | Spark_319/src/data/*.txt HadoopRDD[0] at textFile at 血緣關係.scala:54 [] ****rdd1********************* (2) MapPartitionsRDD[2] at flatMap at 血緣關係.scala:57 [] | Spark_319/src/data/*.txt MapPartitionsRDD[1] at textFile at 血緣關係.scala:54 [] | Spark_319/src/data/*.txt HadoopRDD[0] at textFile at 血緣關係.scala:54 [] *****rdd2******************** (2) ShuffledRDD[4] at groupBy at 血緣關係.scala:60 [] +-(2) MapPartitionsRDD[3] at groupBy at 血緣關係.scala:60 [] | MapPartitionsRDD[2] at flatMap at 血緣關係.scala:57 [] | Spark_319/src/data/*.txt MapPartitionsRDD[1] at textFile at 血緣關係.scala:54 [] | Spark_319/src/data/*.txt HadoopRDD[0] at textFile at 血緣關係.scala:54 [] *****rdd3******************** (2) MapPartitionsRDD[5] at map at 血緣關係.scala:62 [] | ShuffledRDD[4] at groupBy at 血緣關係.scala:60 [] +-(2) MapPartitionsRDD[3] at groupBy at 血緣關係.scala:60 [] | MapPartitionsRDD[2] at flatMap at 血緣關係.scala:57 [] | Spark_319/src/data/*.txt MapPartitionsRDD[1] at textFile at 血緣關係.scala:54 [] | Spark_319/src/data/*.txt HadoopRDD[0] at textFile at 血緣關係.scala:54 [] ****rdd********************* List(org.apache.spark.OneToOneDependency@512575e9) ****rdd1********************* List(org.apache.spark.OneToOneDependency@617389a) *****rdd2******************** List(org.apache.spark.ShuffleDependency@348ad293) *****rdd3******************** List(org.apache.spark.OneToOneDependency@30f74e79)