Spark閉包與序列化
Spark的官方文件再三強調那些將要作用到RDD上的操作,不管它們是一個函式還是一段程式碼片段,它們都是“閉包”,Spark會把這個閉包分發到各個worker節點上去執行,這裡涉及到了一個容易被忽視的問題:閉包的“序列化”。
顯然,閉包是有狀態的,這主要是指它牽涉到的那些自由變數以及自由變數依賴到的其他變數,所以,在將一個簡單的函式或者一段程式碼片段(就是閉包)傳遞給類似RDD.map這樣的操作前,Spark需要檢索閉包內所有的涉及到的變數(包括傳遞依賴的變數),正確地把這些變數序列化之後才能傳遞到worker節點並反序列化去執行。如果在涉及到的所有的變數中有任何不支援序列化或沒有指明如何序列化自己時,你就會遇到這樣的錯誤:
org.apache.spark.SparkException: Task not serializable
在下面的例子中,我們從kafka中持續地接收json訊息,並在spark-streaming中將字串解析成對應的實體:
object App {
private val config = ConfigFactory.load("my-streaming.conf")
case class Person (firstName: String,lastName: String)
def main(args: Array[String]) {
val zkQuorum = config.getString("kafka.zkQuorum")
val myTopic = config.getString("kafka.myTopic")
val myGroup = config.getString("kafka.myGroup")
val conf = new SparkConf().setAppName("my-streaming")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = KafkaUtils.createStream(ssc, zkQuorum, myGroup, Map(myTopic -> 1 ))
//this val is a part of closure, and it's not serializable!
implicit val formats = DefaultFormats
def parser(json: String) = parse(json).extract[Person].firstName
lines.map(_._2).map(parser).print
....
ssc.start()
ssc.awaitTerminationOrTimeout(10000)
ssc.stop()
}
}
這段程式碼在執行時就會報如下錯誤:
org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.json4s.DefaultFormats$
問題的癥結就在於:閉包沒有辦法序列化。在這個例子裡,閉包的範圍是:函式parser以及它所依賴的一個隱式引數: formats , 而問題就出在這個隱式引數上, 它的型別是DefaultFormats,這個類沒有提供序列化和反序列自身的說明,所以Spark無法序列化formats,進而無法將task推送到遠端執行。
隱式引數formats是為extract準備的,它的引數列表如下:
org.json4s.ExtractableJsonAstNode#extract[A](implicit formats: Formats, mf: scala.reflect.Manifest[A]): A = ...
找到問題的根源之後就好解決了。實際上我們根本不需要序列化formats, 對我們來說,它是無狀態的。所以,我們只需要把它宣告為一個全域性靜態的變數就可以繞過序列化。所以改動的方法就是簡單地把implicit val formats = DefaultFormats
的宣告從方法內部遷移到App Object的欄位位置上即可。
object App {
private val config = ConfigFactory.load("my-streaming.conf")
case class Person (firstName: String,lastName: String)
//As Object field, global, static, no need to serialize
implicit val formats = DefaultFormats
def main(args: Array[String]) {
val zkQuorum = config.getString("kafka.zkQuorum")
val myTopic = config.getString("kafka.myTopic")
val myGroup = config.getString("kafka.myGroup")
val conf = new SparkConf().setAppName("my-streaming")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = KafkaUtils.createStream(ssc, zkQuorum, myGroup, Map(myTopic -> 1))
def parser(json: String) = parse(json).extract[Person].firstName
lines..map(_._2).map(parser).print
....
ssc.start()
ssc.awaitTerminationOrTimeout(10000)
ssc.stop()
}
}
這裡再提供另外一個很好的例子:
這個例子很好演示瞭解決類似問題的方案:“把類成員變數拷貝一份到閉包中” ,不然整個物件都需要被序列化!
最後我們來總結一下應該如何正確的處理Spark Task閉包的序列化問題。首先你需要對Task涉及的閉包的邊界要有一個清晰的認識,要儘量地控制閉包的範圍和牽涉到的自由變數,一個非常值得警惕的地方是:儘量不要在閉包中直接引用一個類的成員變數和函式,這樣會導致整個類例項被序列化。這樣的例子在Spark文件中也有提及,如下:
class MyClass {
def func1(s: String): String = { ... }
def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
}
然後,一個好的組織程式碼的方式是:除了那些很短小的函式,儘量把複雜的操作封裝到全域性單一的函式體:全域性靜態方法或者函式物件
如果確實需要某個類的例項參與到計算過程中,則要作好相關的序列化工作。