1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd_血緣關係

第五章_Spark核心程式設計_Rdd_血緣關係

1. RDD 血緣關係

  /*RDD 血緣關係*/
  /*
  * 1. 什麼是Rdd的血緣關係?
  *   1.RDD 只支援粗粒度轉換,即在大量記錄上執行的單個操作。
  *   2.將建立 RDD 的一系列 Lineage (血統)記錄下來,以便恢復丟失的分割槽。
  *   3.RDD的 Lineage 會記錄RDD的 元資料資訊和轉換行為
  *       當該RDD的部分分割槽資料丟失時,它可以根據這些資訊來重新運算和恢復丟失的資料分割槽
  *
  * 2. 怎樣檢視 Rdd的血緣關係?
  *    rdd.toDebugString
  * */

2. RDD 依賴關係

  /*
RDD 依賴關係*/ /* * 1. 什麼是Rdd的依賴關係? * 當前Rdd和父Rdd的依賴關係 * 2. 怎樣檢視 Rdd的依賴關係? * rdd.dependencies * */

3. RDD 窄依賴&寬依賴

  /*RDD 窄依賴&寬依賴*/
  /*
  * 1.什麼是窄依賴?
  *     當前Rdd的1個分割槽 最多依賴父Rdd的一個分割槽
  *     沒有Shuffle過程,例如map、flatmap
  *
  * 2.什麼是寬依賴?
  *     當前Rdd的1個分割槽 依賴父Rdd的多個分割槽資料
  *     有SHuffle過程,例如groupBy
  * 
*/

4. 示例

  //檢視 Rdd的血緣關係
  object FindLineAge extends App {

    val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest")

    val sc: SparkContext = new SparkContext(sparkconf)

    private val rdd: RDD[String] = sc.textFile("Spark_319/src/data/*.txt")


    private val rdd1: RDD[String] = rdd.flatMap(_.split(" "))


    
private val rdd2: RDD[(String, Iterable[String])] = rdd1.groupBy(e => e) private val rdd3: RDD[(String, Int)] = rdd2.map(tp => (tp._1, tp._2.size)) println("****rdd*********************") println(rdd.toDebugString) println("****rdd1*********************") println(rdd1.toDebugString) println("*****rdd2********************") println(rdd2.toDebugString) println("*****rdd3********************") println(rdd3.toDebugString) rdd3.collect().foreach(println(_)) sc.stop() } //檢視 Rdd的依賴關係 object Finddepend extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) private val rdd: RDD[String] = sc.textFile("Spark_319/src/data/*.txt") private val rdd1: RDD[String] = rdd.flatMap(_.split(" ")) private val rdd2: RDD[(String, Iterable[String])] = rdd1.groupBy(e => e) private val rdd3: RDD[(String, Int)] = rdd2.map(tp => (tp._1, tp._2.size)) println("****rdd*********************") println(rdd.dependencies) println("****rdd1*********************") println(rdd1.dependencies) println("*****rdd2********************") println(rdd2.dependencies) println("*****rdd3********************") println(rdd3.dependencies) rdd3.collect().foreach(println(_)) sc.stop() }
****rdd*********************
(2) Spark_319/src/data/*.txt MapPartitionsRDD[1] at textFile at 血緣關係.scala:54 []
 |  Spark_319/src/data/*.txt HadoopRDD[0] at textFile at 血緣關係.scala:54 []
****rdd1*********************
(2) MapPartitionsRDD[2] at flatMap at 血緣關係.scala:57 []
 |  Spark_319/src/data/*.txt MapPartitionsRDD[1] at textFile at 血緣關係.scala:54 []
 |  Spark_319/src/data/*.txt HadoopRDD[0] at textFile at 血緣關係.scala:54 []
*****rdd2********************
(2) ShuffledRDD[4] at groupBy at 血緣關係.scala:60 []
 +-(2) MapPartitionsRDD[3] at groupBy at 血緣關係.scala:60 []
    |  MapPartitionsRDD[2] at flatMap at 血緣關係.scala:57 []
    |  Spark_319/src/data/*.txt MapPartitionsRDD[1] at textFile at 血緣關係.scala:54 []
    |  Spark_319/src/data/*.txt HadoopRDD[0] at textFile at 血緣關係.scala:54 []
*****rdd3********************
(2) MapPartitionsRDD[5] at map at 血緣關係.scala:62 []
 |  ShuffledRDD[4] at groupBy at 血緣關係.scala:60 []
 +-(2) MapPartitionsRDD[3] at groupBy at 血緣關係.scala:60 []
    |  MapPartitionsRDD[2] at flatMap at 血緣關係.scala:57 []
    |  Spark_319/src/data/*.txt MapPartitionsRDD[1] at textFile at 血緣關係.scala:54 []
    |  Spark_319/src/data/*.txt HadoopRDD[0] at textFile at 血緣關係.scala:54 []


****rdd*********************
List(org.apache.spark.OneToOneDependency@512575e9)
****rdd1*********************
List(org.apache.spark.OneToOneDependency@617389a)
*****rdd2********************
List(org.apache.spark.ShuffleDependency@348ad293)
*****rdd3********************
List(org.apache.spark.OneToOneDependency@30f74e79)