1. 程式人生 > 其它 >第五章_Spark核心程式設計_Rdd_閉包檢測&物件序列化

第五章_Spark核心程式設計_Rdd_閉包檢測&物件序列化

1. 說明

  /*閉包檢查*/
  /*
  * 1. Scala的閉包
  *     如果一個函式,訪問了它外部的(區域性)變數的值,那麼這個函式和所處的環境,稱之為閉包
  *     使用場景 :
  *         在巢狀函式中,內層函式可以 只用外層函式的任意變數
  *
  * 2. Spark的閉包
  *     1. 運算元之外的程式碼都是在Driver端執行,運算元裡面的程式碼都是在Executor端執行
  *     2. 在Spark中運算元內的方法中經常用到 運算元外的資料,這樣就形參了閉包效果
  *
  * 3. Spark的閉包檢查
  *     1. 當運算元內的方法 使用運算元外的資料時,會檢查所用的資料是否可以被序列化
  *
  * 4. note
  *    由於Driver 需要將rdd的運算元(計算規則)分發到不同的計算節點(Executor)上去執行
  *    ,如果被分發的計算規則中,使用到了Driver的物件,就需要將該物件序列化後也分發到
  *    相應的Executor上去
  *
  * 5. 物件怎樣才能序列化?
  *    1.繼承 Serializable
  *    2.case 修飾類(樣例類)
  *
  * 6.關於 Serializable 和 Kryo 序列化框架
  *     Serializable : Java原生序列化框架,可以序列化任何類,但是比較重(序列化後位元組數較多),
  *                    不利於網路io
  *     Kryo : Kryo 速度 是 Serializable 的 10 倍,從Spark2.0開始 shuffle資料時,簡單的資料型別
  *            和陣列和字串型別已經使用Kryo
  *     note :
  *          1. 如果自定義的類也想使用Kryo,需要在Driver中註冊
  *          2. 即是使用Kryo序列化,也要繼承 Serializable介面
  *
  * 
*/

2.示例

  /*Spark 閉包檢查示例*/
  object foreachTest extends App {

    val sparkconf: SparkConf = new SparkConf()
      .setMaster("local")
      .setAppName("distinctTest")
      // 替換預設的序列化機制
      .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
      .registerKryoClasses(Array(classOf[comString]))

    
// 註冊需要使用 kryo 序列化的自定義類 val sc: SparkContext = new SparkContext(sparkconf) //初始化一個Rdd val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "java")) //初始化comString 物件 var com = new comString(10, "大王") //檢查 引數 com物件是否能夠被 序列化 private val rdd1 = rdd.map( (_, com) )
//檢查 引數 com.id、com.name 可否被序列化 private val rdd2 = rdd.map( (_, com.id, com.name) ) rdd1.collect().foreach(println(_)) sc.stop() } //1.繼承 Serializable class comString(val id: Int, val name: String) extends Serializable case class comStringCase(val id: Int, val name: String)