1. 程式人生 > >Spark序列化問題

Spark序列化問題

本文主要從以下三個方面解釋Spark 應用中序列化問題 。
1、Java序列化含義。
2、Spark程式碼為什麼需要序列化。
3、如何解決Spark序列化問題。
1、Java序列化含義。
Spark是基於JVM執行的進行,其序列化必然遵守Java的序列化規則。
序列化就是指將一個物件轉化為二進位制的byte流(注意,不是bit流),然後以檔案的方式進行儲存或通過網路傳輸,等待被反序列化讀取出來。序列化常被用於資料存取和通訊過程中。
對於java應用實現序列化一般方法:
class實現序列化操作是讓class 實現Serializable介面,但實現該介面不保證該class一定可以序列化,因為序列化必須保證該class引用的所有屬性可以序列化。
這裡需要明白,static和transient修飾的變數不會被序列化,這也是解決序列化問題的方法之一,讓不能序列化的引用用static和transient來修飾。(static修飾的是類的狀態,而不是物件狀態,所以不存在序列化問題。transient修飾的變數,是不會被序列化到檔案中,在被反序列化後,transient變數的值被設為初始值,如int是0,物件是null)
此外還可以實現readObject()方法和writeObject()方法來自定義實現序列化。
2、Spark的transformation操作為什麼需要序列化。
Spark是分散式執行引擎,其核心抽象是彈性分散式資料集RDD,其代表了分佈在不同節點的資料。Spark的計算是在executor上分散式執行的,故使用者開發的關於RDD的map,flatMap,reduceByKey等transformation 操作(閉包)有如下執行過程:

程式碼中物件在driver本地序列化
物件序列化後傳輸到遠端executor節點
遠端executor節點反序列化物件
最終遠端節點執行
故物件在執行中需要序列化通過網路傳輸,則必須經過序列化過程。

3、如何解決Spark序列化問題。
如果出現NotSerializableException報錯,可以在spark-default.xml檔案中加入如下引數來開啟SerializationDebugger功能類,從而可以在日誌中打印出序列化出問題的類和屬性資訊。
spark.executor.extraJavaOptions -Dsun.io.serialization.extendedDebugInfo=true spark.driver.extraJavaOption -Dsun.io.serialization.extendedDebugInfo=true
對於scala語言開發,解決序列化問題主要如下幾點: 在Object中宣告物件 (每個class對應有一個Object) 如果在閉包中使用SparkContext或者SqlContext,建議使用SparkContext.get() and SQLContext.getActiveOrCreate() 使用static或transient修飾不可序列化的屬性從而避免序列化。
注:scala語言中,class的Object
對於java語言開發,對於不可序列化物件,如果本身不需要儲存或傳輸,則可使用static或trarnsient修飾;如果需要儲存傳輸,則實現writeObject()/readObject()使用自定義序列化方法。 此外注意
對於Spark Streaming作業,注意哪些操作在driver,哪些操作在executor。因為在driver端(foreachRDD)例項化的物件,很可能不能在foreach中執行,因為物件不能從driver序列化傳遞到executor端(有些物件有TCP連結,一定不可以序列化)。所以這裡一般在foreachPartitions或foreach運算元中來例項化物件,這樣物件在executor端例項化,沒有從driver傳輸到executor的過程。
dstream.foreachRDD { rdd => val where1 = “on the driver” rdd.foreach { record => val where2 = “on different executors” } } }

作者:從0到1哦
來源:CSDN
原文:https://blog.csdn.net/weixin_38653290/article/details/84503295
版權宣告:本文為博主原創文章,轉載請附上博文連結!