關於Spark RDD 的認識
阿新 • • 發佈:2020-08-19
一、基本認識
RDD是Spark大資料計算引擎中,抽象的一種資料結構。
RDD(Resilient Distributed Dataset),中文意思是彈性分散式資料集,它是Spark中的基本抽象。在Spark原始碼中,有下面的註釋:
RDD 有五個主要的屬性:
-
A list of partitions (分割槽列表)
-
A function for computing each split (分割槽計算函式) 相同的計算邏輯應用在不同的分割槽中
-
A list of dependencies on other RDDs (多個RDD之間存在依賴關係)
-
Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) (對鍵值對型別的資料進行分割槽)
-
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file) (首選位置,計算資料的位置)
- 啟動Yarn叢集資源
- spark申請資源,建立排程節點和計算節點
- 根據需求,spark把計算邏輯,根據分割槽,劃分成不同的任務
- 排程節點把任務根據計算節點的狀態,傳送到對應的計算節點上進行計算
1 import org.apache.spark.rdd.RDD 2 import org.apache.spark.{SparkConf, SparkContext} 3 4 object Spark01RddCreate {5 def main(args: Array[String]): Unit = { 6 System.setProperty("hadoop.home.dir", "C:\\Hadoop\\") 7 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark01rddmemory") 8 val sc = new SparkContext(sparkConf) 9 10 // TODO: Spark 從記憶體中建立RDD 11 val list = List(1, 2, 3, 4) 12 val rdd1 = sc.parallelize(list) 13 val rdd2 = sc.makeRDD(list) 14 rdd1.collect().foreach(println) 15 rdd2.collect().foreach(println) 16 17 // TODO: Spark 從外部檔案中建立RDD 18 val sc_text: RDD[String] = sc.textFile("G:\\SNBRecommendSys\\recommender\\DataLoader\\src\\main\\input_data") 19 System.out.println("從外部檔案中建立RDD:\n") 20 sc_text.collect().foreach(println) 21 22 // TODO: Spark 從其他RDD建立RDD 23 val flatRDD = sc_text.flatMap(line => { 24 line.split(" ") 25 }) 26 System.out.println("從其他RDD建立RDD:\n") 27 flatRDD.collect().foreach(println) 28 29 sc.stop() 30 } 31 }
2、關於RDD並行度的理解
Spark將一個作業切分為多個任務後,會發送給Excutor節點平行計算,能夠平行計算的任務數量就是並行度。計算的任務數量可以在建立RDD的時候去指定。 RDD中,分割槽的數量就是RDD的並行度,設定並行度就是設定分割槽的數量。 下面程式碼中,我們可以看到設定並行度,就是在建立RDD的時候,傳入的第二個引數值1 import org.apache.spark.{SparkConf, SparkContext} 2 3 object Spark02RddParallelizeSet { 4 def main(args: Array[String]): Unit = { 5 System.setProperty("hadoop.home.dir", "C:\\Hadoop\\") 6 val spark = new SparkConf().setMaster("local[*]").setAppName("RddParallelizeSet") 7 val context = new SparkContext(spark) 8 9 val list = List(1, 2, 3, 4, 5) 10 11 // TODO: 從記憶體建立RDD,並且設定並行執行的任務數量 12 // numSlices: Int = defaultParallelism 13 val memoryRDD = context.makeRDD(list, 4) 14 memoryRDD.collect().foreach(println) 15 16 // TODO: 結束 17 context.stop() 18 } 19 }
我們在一層層進入Spark原始碼,最終可以檢視到關於RDD並行度的相關資訊:
我們可以在這個實現方法裡看到 scheduler.conf.getInt(引數一,引數二),引數一是spark配置檔案裡的一個配置項,引數二的意思是本地機器的cpu核數。排程程式是從spark的配置檔案裡讀取了 spark.default.parallelism這個配置。如果沒有讀取到這個配置的話,則並行度設定將會與本地機器的cpu核數一樣。
現在回到我們自己寫的程式裡,在建立spark配置例項的時候,我們其實已經在設定要用多少個本地機器的核數了:
setMaster()裡面的 local[*],代表的含義是本地機器cpu有多少核,在排程的時候就用到多少核。當然我們也可以設定其它數字,如果你想這樣做的話。現在,我們大致可以理解設定並行度是怎麼一回事了。 最後,原始碼是個好東西,多去看優秀的原始碼,很多不清楚的地方都能夠迎刃而解。 加油,共勉!