1. 程式人生 > 其它 >Spark ~ RDD 序列化

Spark ~ RDD 序列化

技術標籤:Spark

Spark ~ RDD 序列化

案列,沒有經過系列化的情況

package org.example
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

object Kryo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark"
).setLevel(Level.WARN) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) var conf = new SparkConf().setMaster("local").setAppName("cai") var sc = new SparkContext(conf) var rdd = sc.parallelize(1 to 9) var user = new User //rdd 運算元中傳遞的函式如果包括閉包操作的,就會進行檢測功能檢測變數有沒有序列化
//閉包檢測 rdd.foreach(x=>{ println("age is:"+ (x.toInt + user.age)) }) } class User { var age : Int = 30 } }

此時會報錯:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(
ClosureCleaner.scala:403) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:971) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.foreach(RDD.scala:970) at org.example.Kryo$.main(Kryo.scala:18) at org.example.Kryo.main(Kryo.scala) Caused by: java.io.NotSerializableException: org.example.Kryo$User Serialization stack: - object not serializable (class: org.example.Kryo$User, value: org.example.Kryo$User@76304b46) - field (class: scala.runtime.ObjectRef, name: elem, type: class java.lang.Object) - object (class scala.runtime.ObjectRef, org.example.Kryo$User@76304b46) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.example.Kryo$, functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcVI$sp.apply$mcVI$sp:(I)V, implementation=invokeStatic org/example/Kryo$.$anonfun$main$1:(Lscala/runtime/ObjectRef;I)V, instantiatedMethodType=(I)V, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.example.Kryo$$$Lambda$527/1691629865, org.example.Kryo$$$Lambda$527/1691629865@43d455c9) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400) ... 10 more Process finished with exit code 1

Exception in thread “main” org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.example.Kryo$User
報錯資訊顯示沒有進行序列化,User沒有進行序列化。

原因分析:

從計算的角度, 運算元以外的程式碼都是在 Driver 端執行, 運算元裡面的程式碼都是在 Executor
端執行。
那麼在 scala 的函數語言程式設計中,就會導致運算元內經常會用到運算元外的資料,這樣就形成了閉包的效果,如果使用的運算元外的資料無法序列化,就意味著無法傳值給 Executor端執行,就會發生錯誤,所以需要在執行任務計算前,檢測閉包內的物件是否可以進行序列化,這個操作我們稱之為閉包檢測。Scala2.12 版本後閉包編譯方式發生了改變

進行序列化

extends Serializable

使用 extends Serializable 來進行序列化。

  class User extends Serializable {
    var age : Int = 30
  }

或者使用以下的方式來進行序列化

  case class User() {
    var age : Int = 30
  }

Kryo

Java 的序列化能夠序列化任何的類。但是比較重(位元組多),序列化後,物件的提交也
比較大。Spark 出於效能的考慮,Spark2.0 開始支援另外一種 Kryo 序列化機制。Kryo 速度是 Serializable 的 10 倍。當 RDD 在 Shuffle 資料的時候,簡單資料型別、陣列和字串型別已經在 Spark 內部使用 Kryo 來序列化。
注意:即使使用 Kryo 序列化,也要繼承 Serializable 介面

var conf = new SparkConf().setMaster(“local”).setAppName(“cai”)
// 替換預設的序列化機制
.set(“spark.serializer”,“org.apache.spark.serializer.KryoSerializer”)
// 註冊需要使用 kryo 序列化的自定義類
.registerKryoClasses(Array(classOf[User]))

package org.example
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

object Kryo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    var conf = new SparkConf().setMaster("local").setAppName("cai")
              // 替換預設的序列化機制
              .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
              // 註冊需要使用 kryo 序列化的自定義類
              .registerKryoClasses(Array(classOf[User]))
    var sc = new SparkContext(conf)

    var rdd = sc.parallelize(1 to 9)

    var user = new User

    //rdd 運算元中傳遞的函式如果包括閉包操作的,就會進行檢測功能檢測變數有沒有序列化
    //閉包檢測
    rdd.foreach(x=>{
      println("age is:"+ (x.toInt + user.age))
    })
  }

  case class User() {
    var age : Int = 30
  }
}