1. 程式人生 > 實用技巧 >關於Spark RDD 的認識

關於Spark RDD 的認識

一、基本認識

RDD是Spark大資料計算引擎中,抽象的一種資料結構。

RDD(Resilient Distributed Dataset),中文意思是彈性分散式資料集,它是Spark中的基本抽象。在Spark原始碼中,有下面的註釋:

RDD 有五個主要的屬性:

  • A list of partitions (分割槽列表)
  • A function for computing each split (分割槽計算函式) 相同的計算邏輯應用在不同的分割槽中
  • A list of dependencies on other RDDs (多個RDD之間存在依賴關係)
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) (對鍵值對型別的資料進行分割槽)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (首選位置,計算資料的位置)
二、RDD的執行原理 類似於IO處理,體現了裝飾者設計模式。 從計算的角度來看,計算過程受兩個因素的影響:計算資源和計算邏輯。執行計算的過程就是將計算資源和計算邏輯進行一定的整合。 spark在執行計算的過程中,會先向叢集申請資源,然後把程式的處理邏輯分成一個個的計算任務,然後把任務發到已經分配資源的計算節點上。按照指定的計算模型進行資料計算。 RDD是spark用於資料處理的核心模型。Yarn環境中,RDD的執行原理如下所示:
  • 啟動Yarn叢集資源
  • spark申請資源,建立排程節點和計算節點
  • 根據需求,spark把計算邏輯,根據分割槽,劃分成不同的任務
  • 排程節點把任務根據計算節點的狀態,傳送到對應的計算節點上進行計算
三、在程式碼中使用RDD 1、建立RDD 從集合(記憶體)、外部檔案、其它RDD中建立RDD,程式碼如下:
 1 import org.apache.spark.rdd.RDD
 2 import org.apache.spark.{SparkConf, SparkContext}
 3 
 4 object Spark01RddCreate {
5 def main(args: Array[String]): Unit = { 6 System.setProperty("hadoop.home.dir", "C:\\Hadoop\\") 7 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark01rddmemory") 8 val sc = new SparkContext(sparkConf) 9 10 // TODO: Spark 從記憶體中建立RDD 11 val list = List(1, 2, 3, 4) 12 val rdd1 = sc.parallelize(list) 13 val rdd2 = sc.makeRDD(list) 14 rdd1.collect().foreach(println) 15 rdd2.collect().foreach(println) 16 17 // TODO: Spark 從外部檔案中建立RDD 18 val sc_text: RDD[String] = sc.textFile("G:\\SNBRecommendSys\\recommender\\DataLoader\\src\\main\\input_data") 19 System.out.println("從外部檔案中建立RDD:\n") 20 sc_text.collect().foreach(println) 21 22 // TODO: Spark 從其他RDD建立RDD 23 val flatRDD = sc_text.flatMap(line => { 24 line.split(" ") 25 }) 26 System.out.println("從其他RDD建立RDD:\n") 27 flatRDD.collect().foreach(println) 28 29 sc.stop() 30 } 31 }

2、關於RDD並行度的理解

Spark將一個作業切分為多個任務後,會發送給Excutor節點平行計算,能夠平行計算的任務數量就是並行度。計算的任務數量可以在建立RDD的時候去指定。 RDD中,分割槽的數量就是RDD的並行度,設定並行度就是設定分割槽的數量。 下面程式碼中,我們可以看到設定並行度,就是在建立RDD的時候,傳入的第二個引數值
 1 import org.apache.spark.{SparkConf, SparkContext}
 2 
 3 object Spark02RddParallelizeSet {
 4   def main(args: Array[String]): Unit = {
 5     System.setProperty("hadoop.home.dir", "C:\\Hadoop\\")
 6     val spark = new SparkConf().setMaster("local[*]").setAppName("RddParallelizeSet")
 7     val context = new SparkContext(spark)
 8 
 9     val list = List(1, 2, 3, 4, 5)
10 
11     // TODO: 從記憶體建立RDD,並且設定並行執行的任務數量
12     // numSlices: Int = defaultParallelism
13     val memoryRDD = context.makeRDD(list, 4)
14     memoryRDD.collect().foreach(println)
15 
16     // TODO: 結束
17     context.stop()
18   }
19 }

我們在一層層進入Spark原始碼,最終可以檢視到關於RDD並行度的相關資訊:

我們可以在這個實現方法裡看到 scheduler.conf.getInt(引數一,引數二),引數一是spark配置檔案裡的一個配置項,引數二的意思是本地機器的cpu核數。排程程式是從spark的配置檔案裡讀取了 spark.default.parallelism這個配置。如果沒有讀取到這個配置的話,則並行度設定將會與本地機器的cpu核數一樣。

現在回到我們自己寫的程式裡,在建立spark配置例項的時候,我們其實已經在設定要用多少個本地機器的核數了:

setMaster()裡面的 local[*],代表的含義是本地機器cpu有多少核,在排程的時候就用到多少核。當然我們也可以設定其它數字,如果你想這樣做的話。現在,我們大致可以理解設定並行度是怎麼一回事了。 最後,原始碼是個好東西,多去看優秀的原始碼,很多不清楚的地方都能夠迎刃而解。 加油,共勉!