Spark中的RDD快取----cache
阿新 • • 發佈:2022-03-13
Spark中的RDD快取----cache
目錄前面說到,spark中的RDD是沒有資料的,因為資料流過而不留下,
有時候對同一個RDD我們需要使用多次,每次使用該RDD,資料都要重新呼叫,非常麻煩;
這時候我們可以通過對RDD進行快取,將RDD快取在記憶體或者磁盤裡面,
這樣就使RDD中含有資料了。
RDD快取程式碼示例
package com.shujia.spark import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object Demo23Cache { def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName("Demo23Cache") .setMaster("local") val sc = new SparkContext(conf) //讀取檔案建立RDD val studentsRDD: RDD[String] = sc.textFile("data/students.txt") //拆分資料 val studentsTupleRDD: RDD[(String, String, Int, String, String)] = studentsRDD.map(stu => { val split: Array[String] = stu.split(",") (split(0), split(1), split(2).toInt, split(3), split(4)) //整理成元組的形式輸出 }) /** * 當同一個RDD多次使用的時候,可以將這個RDD快取---cache */ studentsTupleRDD.cache() /** * 統計班級人數 */ //取出班級 val clazzRDD: RDD[(String, Int)] = studentsTupleRDD.map{ case (id:String, name:String, age:Int, gender:String, clazz:String)=> (clazz,1) } /* 在case匹配的時候,只用到了一個列,其他的沒用上,那麼其他的列可以用下劃線代替 case (_:String, _:String, _:Int, _:String, _:String)=> */ //按照key聚合value val clazzNumRDD: RDD[(String, Int)] = clazzRDD.reduceByKey((x, y) => x + y) clazzNumRDD.foreach(println) //(理科二班,79) //(文科三班,94) /** * 統計性別人數 */ val genderRDD: RDD[(String, Int)] = studentsTupleRDD.map{ case (_:String, _:String, _:Int, gender:String, _:String) => (gender,1) } val genderNumRDD: RDD[(String, Int)] = genderRDD.reduceByKey((x, y) => x + y) genderNumRDD.foreach(println) } }
第一個action運算元,就觸發了一次任務,程式開始從頭到尾執行一遍--->統計了班級人數;
第二個action運算元,也觸發了一次任務,程式也開始從頭到尾執行一遍-->統計了性別人數;
studentsTupeRDD之前的包括自身都被執行了兩遍
新增RDD快取
cache的流程: 當第一個Job在執行的時候發現RDD執行cache,會將RDD的資料快取起來; 下一個Job如果再使用這個RDD,可以直接從快取中讀取資料,不需要重新計算這個RDD; 快取的資料在Executor中, 預設快取的級別MEMORY_ONLY,只存記憶體 預設值 --executor-memory 512m--num-executor 2 總的記憶體是512*2=1G,快取可以使用0.6(600M), 如果RDD的資料超過了600M,會出現記憶體溢位