Spark計算的核心RDD
阿新 • • 發佈:2018-12-22
在SparkCore中的一切計算都是基於RDD的,那RDD是個什麼東西呢?
RDD是Resilient Distribute Dataset(彈性分散式資料集)的縮寫,說白了,RDD可以理解為spark處理資料的基本單位,但是RDD又不是真實的存有資料,它只是具有操作資料的能力,相當於一個租房中介,中介手上掌握了一手的房源資訊,而sparkCore就相當於租房子的人,一般直接找到房子不簡單,所以我們找到中介,中介向我們提供租房資訊,我們在中介提供的資訊的基礎上進行價格,位置等篩選處理,就相當於spark通過RDD對源資料進行計算。好了,現在我們理解了RDD是個什麼東西,現在直接上一個WordCount的scala程式碼分析來幫助我們更加深入地理解RDD,圖片不清晰可單擊放大檢視高清大圖:
看完了以上的WordCount原始碼分析後相信大家對RDD有一定的理解了,接下來介紹RDD的一些特性。RDD具有五大特性:
- A list of partitions
- A function for computing each partition
- A list of dependencies on other RDDs
- Optionally, a Partitioner for key-value RDDs
- Optionally, a list of preferred locations to compute each split on
這個是官網對於RDD的五大特性的解釋,翻譯過來差不多就是:
- RDD是由一系列的partition組成的
- RDD提供的每一個函式實際上是作用於每一個partition上的
- RDD具有一系列的依賴關係,依賴於其他的RDD
- 可選項,分割槽器是作用在KV格式的RDD上的(KV格式的的RDD:如果RDD中的資料是二元組型別的,那麼我們就稱這個RDD是KV格式的RDD,如上圖的pairRDD和resRDD)
- 可選項,RDD會提供一系列的最佳的計算位置(RDD提供了一個方法介面,直接呼叫這個方法介面就能拿到RDD所有的partition的分割槽的位置)
對於第一個特性RDD是由一系列的partition組成的,可以參照下圖:
結合WordCount例項的RDD分析:
由上圖我們很容易就能看出RDD的依賴關係。RDD的依賴關係還可以用一張人類進化圖表示:
進化完成的人類就相當於我們經過一系列處理得到的RDD。
綜上,我們可以知道,RDD是sparkCore計算的基本核心,RDD是邏輯上的概念,本身不儲存資料,但是具有操作資料的能力,partition也是一樣的。
上面用到的WordCount的原始碼:
package com.hpe.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object WordCountSpark {
def main(args: Array[String]): Unit = {
//建立配置物件
val conf = new SparkConf()
//設定app的名稱 有啥用?方便在監控頁面找到
conf.setAppName("WordCount")
//設定spark的執行模式 local本地執行 用於測試環境
conf.setMaster("local")
//建立上下文 他是通往叢集的唯一通道
val sc = new SparkContext(conf)
/**
* 處理資料 在sparkCore中一切計算都是基於RDD
* R(resilient)D(distribute)D(dataset)
* RDD:彈性分散式資料集
*/
val lineRDD = sc.textFile("d:/wc.txt")
//基於lineRDD中的資料,進行分詞
val wordRDD = lineRDD.flatMap { _.split(" ") }
//將每一個單詞基數為1 pairRDD K:word V:1
val pairRDD = wordRDD.map { x => (x,1)}
//將相同的單詞分組,組內資料累加 hello:{1,1,1,1,1}
//restRDD K:word V:count
//val resRDD = pairRDD.reduceByKey((v1,v2) => v1+v2)
val resRDD = pairRDD.reduceByKey(_+_)
resRDD.foreach(println)
sc.stop()
}
}