深入解析spark RDD
彈性分散式資料集 RDD(只讀,可分割槽) 這個資料集的部分或者全部可以快取在記憶體中。
所謂彈性,是指記憶體不夠時可以與磁碟進行交換。
RDD 作為資料結構,本質上是一個只讀的分割槽記錄集合。一個rdd可以包含多個分割槽,每個分割槽就是一個數據集片段。
寬依賴和窄依賴的區別:narrow dependecies 可以支援同一個cluster node 還是哪個以pipeline形式執行多條命令,如執行了map後可以繼續filter,在失敗恢復的時候只需要重新計算丟失的parent partition即可,而且可以並行的在不同的節點進行重計算。而shuffle dependencies 需要所有的父分割槽都可用,在失敗恢復的時候牽涉個各級的多個parent partitions。
rdd的操作如上圖兩種,轉換和動作。
rdd的四個核心方法:1 getPartitions 返回的是一系列partitions的集合,即一個partition型別的陣列。
2 getDependencies 返回的是依賴關係的一個Seq集合,裡面的Dependency陣列中的下劃線是型別的PlaceHolder(佔位符)。
3 Compute 是針對RDD的每個partition進行計算的
4 getPreferredLocations是尋找partition的首選位置
通過呼叫SparkContext的parallelize方法,在一個已經存在的Scala集合上建立的(一個Seq物件)。集合的物件將會被拷貝,創建出一個可以被並行操作的分散式資料集。
spark-shell執行在叢集上的方法
下面小部分引用 https://blog.csdn.net/legotime/article/details/51871724 推薦看原文 特別不錯 但有個別分割槽問題有待驗證
外部讀取引數textFile
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
分析引數:
path: String 是一個URI,這個URI可以是HDFS、本地檔案(全部的節點都可以),或者其他Hadoop支援的檔案系統URI返回的是一個字串型別的RDD,也就是是RDD的內部形式是Iterator[(String)]
minPartitions= math.min(defaultParallelism, 2) 是指定資料的分割槽,如果不指定分割槽,當你的核數大於2的時候,不指定分割槽數那麼就是 2
當你的資料大於128M時候,Spark是為每一個塊(block)建立一個分片(Hadoop-2.X之後為128m一個block)
從本地系統讀取整個資料夾
val path = "file:///usr/local/spark/spark-1.6.0-bin-hadoop2.6/licenses/" //local file
val rdd1 = sc.textFile(path,2)
從本地系統中讀取licenses這個資料夾下的所有檔案
這裡特別注意的是,比如這個資料夾下有35個檔案,上面分割槽數設定是2,那麼整個RDD的分割槽數是35*2?
這是錯誤的,這個RDD的分割槽數不管你的partition數設定為多少時,只要license這個資料夾下的這個檔案a.txt
(比如有a.txt)沒有超過128m,那麼a.txt就只有一個partition。那麼就是說只要這35個檔案其中沒有一個超過
128m,那麼分割槽數就是 35個
對spark輸出結果進行排序(廣告點選與商品排名)
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope
{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
union操作:
結果
groupByKey :
結果:
join操作實際上就是一個笛卡爾積的操作
下面介紹Action操作:
reduce
lookup的使用:
結果:
rdd.toDebugString 可以檢視lineage關係