第五章_Spark核心程式設計_Rdd_閉包檢測&物件序列化
阿新 • • 發佈:2022-04-01
1. 說明
/*閉包檢查*/ /* * 1. Scala的閉包 * 如果一個函式,訪問了它外部的(區域性)變數的值,那麼這個函式和所處的環境,稱之為閉包 * 使用場景 : * 在巢狀函式中,內層函式可以 只用外層函式的任意變數 * * 2. Spark的閉包 * 1. 運算元之外的程式碼都是在Driver端執行,運算元裡面的程式碼都是在Executor端執行 * 2. 在Spark中運算元內的方法中經常用到 運算元外的資料,這樣就形參了閉包效果 * * 3. Spark的閉包檢查 * 1. 當運算元內的方法 使用運算元外的資料時,會檢查所用的資料是否可以被序列化 * * 4. note * 由於Driver 需要將rdd的運算元(計算規則)分發到不同的計算節點(Executor)上去執行 * ,如果被分發的計算規則中,使用到了Driver的物件,就需要將該物件序列化後也分發到 * 相應的Executor上去 * * 5. 物件怎樣才能序列化? * 1.繼承 Serializable * 2.case 修飾類(樣例類) * * 6.關於 Serializable 和 Kryo 序列化框架 * Serializable : Java原生序列化框架,可以序列化任何類,但是比較重(序列化後位元組數較多), * 不利於網路io * Kryo : Kryo 速度 是 Serializable 的 10 倍,從Spark2.0開始 shuffle資料時,簡單的資料型別 * 和陣列和字串型別已經使用Kryo * note : * 1. 如果自定義的類也想使用Kryo,需要在Driver中註冊 * 2. 即是使用Kryo序列化,也要繼承 Serializable介面 * **/
2.示例
/*Spark 閉包檢查示例*/ object foreachTest extends App { val sparkconf: SparkConf = new SparkConf() .setMaster("local") .setAppName("distinctTest") // 替換預設的序列化機制 .set("spark.serializer","org.apache.spark.serializer.KryoSerializer") .registerKryoClasses(Array(classOf[comString]))// 註冊需要使用 kryo 序列化的自定義類 val sc: SparkContext = new SparkContext(sparkconf) //初始化一個Rdd val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "java")) //初始化comString 物件 var com = new comString(10, "大王") //檢查 引數 com物件是否能夠被 序列化 private val rdd1 = rdd.map( (_, com) )//檢查 引數 com.id、com.name 可否被序列化 private val rdd2 = rdd.map( (_, com.id, com.name) ) rdd1.collect().foreach(println(_)) sc.stop() } //1.繼承 Serializable class comString(val id: Int, val name: String) extends Serializable case class comStringCase(val id: Int, val name: String)