Spark ~ RDD 序列化
技術標籤:Spark
Spark ~ RDD 序列化
案列,沒有經過系列化的情況
package org.example
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
object Kryo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark" ).setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
var conf = new SparkConf().setMaster("local").setAppName("cai")
var sc = new SparkContext(conf)
var rdd = sc.parallelize(1 to 9)
var user = new User
//rdd 運算元中傳遞的函式如果包括閉包操作的,就會進行檢測功能檢測變數有沒有序列化
//閉包檢測
rdd.foreach(x=>{
println("age is:"+ (x.toInt + user.age))
})
}
class User {
var age : Int = 30
}
}
此時會報錯:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable( ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD.$anonfun$foreach$1(RDD.scala:971)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:970)
at org.example.Kryo$.main(Kryo.scala:18)
at org.example.Kryo.main(Kryo.scala)
Caused by: java.io.NotSerializableException: org.example.Kryo$User
Serialization stack:
- object not serializable (class: org.example.Kryo$User, value: org.example.Kryo$User@76304b46)
- field (class: scala.runtime.ObjectRef, name: elem, type: class java.lang.Object)
- object (class scala.runtime.ObjectRef, org.example.Kryo$User@76304b46)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 1)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.example.Kryo$, functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcVI$sp.apply$mcVI$sp:(I)V, implementation=invokeStatic org/example/Kryo$.$anonfun$main$1:(Lscala/runtime/ObjectRef;I)V, instantiatedMethodType=(I)V, numCaptured=1])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.example.Kryo$$$Lambda$527/1691629865, org.example.Kryo$$$Lambda$527/1691629865@43d455c9)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 10 more
Process finished with exit code 1
Exception in thread “main” org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.example.Kryo$User
報錯資訊顯示沒有進行序列化,User沒有進行序列化。
原因分析:
從計算的角度, 運算元以外的程式碼都是在 Driver 端執行, 運算元裡面的程式碼都是在 Executor
端執行。那麼在 scala 的函數語言程式設計中,就會導致運算元內經常會用到運算元外的資料,這樣就形成了閉包的效果,如果使用的運算元外的資料無法序列化,就意味著無法傳值給 Executor端執行,就會發生錯誤,所以需要在執行任務計算前,檢測閉包內的物件是否可以進行序列化,這個操作我們稱之為閉包檢測。Scala2.12 版本後閉包編譯方式發生了改變
進行序列化
extends Serializable
使用 extends Serializable 來進行序列化。
class User extends Serializable {
var age : Int = 30
}
或者使用以下的方式來進行序列化
case class User() {
var age : Int = 30
}
Kryo
Java 的序列化能夠序列化任何的類。但是比較重(位元組多),序列化後,物件的提交也
比較大。Spark 出於效能的考慮,Spark2.0 開始支援另外一種 Kryo 序列化機制。Kryo 速度是 Serializable 的 10 倍。當 RDD 在 Shuffle 資料的時候,簡單資料型別、陣列和字串型別已經在 Spark 內部使用 Kryo 來序列化。
注意:即使使用 Kryo 序列化,也要繼承 Serializable 介面
var conf = new SparkConf().setMaster(“local”).setAppName(“cai”)
// 替換預設的序列化機制
.set(“spark.serializer”,“org.apache.spark.serializer.KryoSerializer”)
// 註冊需要使用 kryo 序列化的自定義類
.registerKryoClasses(Array(classOf[User]))
package org.example
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
object Kryo {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
var conf = new SparkConf().setMaster("local").setAppName("cai")
// 替換預設的序列化機制
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
// 註冊需要使用 kryo 序列化的自定義類
.registerKryoClasses(Array(classOf[User]))
var sc = new SparkContext(conf)
var rdd = sc.parallelize(1 to 9)
var user = new User
//rdd 運算元中傳遞的函式如果包括閉包操作的,就會進行檢測功能檢測變數有沒有序列化
//閉包檢測
rdd.foreach(x=>{
println("age is:"+ (x.toInt + user.age))
})
}
case class User() {
var age : Int = 30
}
}