1. 程式人生 > >Spark計算的核心RDD

Spark計算的核心RDD

在SparkCore中的一切計算都是基於RDD的,那RDD是個什麼東西呢?

RDD是Resilient Distribute Dataset(彈性分散式資料集)的縮寫,說白了,RDD可以理解為spark處理資料的基本單位,但是RDD又不是真實的存有資料,它只是具有操作資料的能力,相當於一個租房中介,中介手上掌握了一手的房源資訊,而sparkCore就相當於租房子的人,一般直接找到房子不簡單,所以我們找到中介,中介向我們提供租房資訊,我們在中介提供的資訊的基礎上進行價格,位置等篩選處理,就相當於spark通過RDD對源資料進行計算。好了,現在我們理解了RDD是個什麼東西,現在直接上一個WordCount的scala程式碼分析來幫助我們更加深入地理解RDD,圖片不清晰可單擊放大檢視高清大圖:
在這裡插入圖片描述

看完了以上的WordCount原始碼分析後相信大家對RDD有一定的理解了,接下來介紹RDD的一些特性。RDD具有五大特性:

  1. A list of partitions
  2. A function for computing each partition
  3. A list of dependencies on other RDDs
  4. Optionally, a Partitioner for key-value RDDs
  5. Optionally, a list of preferred locations to compute each split on

這個是官網對於RDD的五大特性的解釋,翻譯過來差不多就是:

  1. RDD是由一系列的partition組成的
  2. RDD提供的每一個函式實際上是作用於每一個partition上的
  3. RDD具有一系列的依賴關係,依賴於其他的RDD
  4. 可選項,分割槽器是作用在KV格式的RDD上的(KV格式的的RDD:如果RDD中的資料是二元組型別的,那麼我們就稱這個RDD是KV格式的RDD,如上圖的pairRDD和resRDD)
  5. 可選項,RDD會提供一系列的最佳的計算位置(RDD提供了一個方法介面,直接呼叫這個方法介面就能拿到RDD所有的partition的分割槽的位置)

對於第一個特性RDD是由一系列的partition組成的,可以參照下圖:

在這裡插入圖片描述

結合WordCount例項的RDD分析:
在這裡插入圖片描述

由上圖我們很容易就能看出RDD的依賴關係。RDD的依賴關係還可以用一張人類進化圖表示:
在這裡插入圖片描述
進化完成的人類就相當於我們經過一系列處理得到的RDD。

綜上,我們可以知道,RDD是sparkCore計算的基本核心,RDD是邏輯上的概念,本身不儲存資料,但是具有操作資料的能力,partition也是一樣的。
上面用到的WordCount的原始碼:

package com.hpe.spark

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object WordCountSpark {
    def main(args: Array[String]): Unit = {
      //建立配置物件
      val conf = new SparkConf()
      //設定app的名稱  有啥用?方便在監控頁面找到
      conf.setAppName("WordCount")
      //設定spark的執行模式   local本地執行  用於測試環境
      conf.setMaster("local")
      //建立上下文    他是通往叢集的唯一通道
      val sc = new SparkContext(conf)
      /**
       * 處理資料		在sparkCore中一切計算都是基於RDD
       * R(resilient)D(distribute)D(dataset)
       * RDD:彈性分散式資料集
       */
      val lineRDD = sc.textFile("d:/wc.txt")
      //基於lineRDD中的資料,進行分詞
      val wordRDD = lineRDD.flatMap { _.split(" ") }
      //將每一個單詞基數為1   pairRDD  K:word V:1
      val pairRDD = wordRDD.map { x => (x,1)}
      //將相同的單詞分組,組內資料累加  hello:{1,1,1,1,1}
      //restRDD K:word V:count
      //val resRDD = pairRDD.reduceByKey((v1,v2) => v1+v2)
    	val resRDD = pairRDD.reduceByKey(_+_)
      resRDD.foreach(println)
      
      
      sc.stop()
    }
}