1. 程式人生 > >深入解析spark RDD

深入解析spark RDD

彈性分散式資料集  RDD(只讀,可分割槽)  這個資料集的部分或者全部可以快取在記憶體中。

所謂彈性,是指記憶體不夠時可以與磁碟進行交換。

RDD 作為資料結構,本質上是一個只讀的分割槽記錄集合。一個rdd可以包含多個分割槽,每個分割槽就是一個數據集片段。

 

寬依賴和窄依賴的區別:narrow dependecies 可以支援同一個cluster node 還是哪個以pipeline形式執行多條命令,如執行了map後可以繼續filter,在失敗恢復的時候只需要重新計算丟失的parent partition即可,而且可以並行的在不同的節點進行重計算。而shuffle dependencies 需要所有的父分割槽都可用,在失敗恢復的時候牽涉個各級的多個parent partitions。

rdd的操作如上圖兩種,轉換和動作。

rdd的四個核心方法:1 getPartitions   返回的是一系列partitions的集合,即一個partition型別的陣列。

2 getDependencies  返回的是依賴關係的一個Seq集合,裡面的Dependency陣列中的下劃線是型別的PlaceHolder(佔位符)。

3  Compute 是針對RDD的每個partition進行計算的

4 getPreferredLocations是尋找partition的首選位置

 

通過呼叫SparkContext的parallelize方法,在一個已經存在的Scala集合上建立的(一個Seq物件)。集合的物件將會被拷貝,創建出一個可以被並行操作的分散式資料集。

spark-shell執行在叢集上的方法  

下面小部分引用 https://blog.csdn.net/legotime/article/details/51871724  推薦看原文  特別不錯  但有個別分割槽問題有待驗證

外部讀取引數textFile

/**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

分析引數:
path: String 是一個URI,這個URI可以是HDFS、本地檔案(全部的節點都可以),或者其他Hadoop支援的檔案系統URI返回的是一個字串型別的RDD,也就是是RDD的內部形式是Iterator[(String)]

minPartitions=  math.min(defaultParallelism, 2) 是指定資料的分割槽,如果不指定分割槽,當你的核數大於2的時候,不指定分割槽數那麼就是 2

當你的資料大於128M時候,Spark是為每一個塊(block)建立一個分片(Hadoop-2.X之後為128m一個block)

從本地系統讀取整個資料夾

val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/"  //local file
val rdd1 = sc.textFile(path,2)
從本地系統中讀取licenses這個資料夾下的所有檔案
這裡特別注意的是,比如這個資料夾下有35個檔案,上面分割槽數設定是2,那麼整個RDD的分割槽數是35*2?

這是錯誤的,這個RDD的分割槽數不管你的partition數設定為多少時,只要license這個資料夾下的這個檔案a.txt

(比如有a.txt)沒有超過128m,那麼a.txt就只有一個partition。那麼就是說只要這35個檔案其中沒有一個超過

128m,那麼分割槽數就是 35個

 

對spark輸出結果進行排序(廣告點選與商品排名)

  def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope
  {
    val part = new RangePartitioner(numPartitions, self, ascending)
    new ShuffledRDD[K, V, V](self, part)
      .setKeyOrdering(if (ascending) ordering else ordering.reverse)
  }

union操作:

結果

groupByKey :

結果:

join操作實際上就是一個笛卡爾積的操作  

 

下面介紹Action操作:

reduce  

lookup的使用:

結果:

rdd.toDebugString 可以檢視lineage關係