Spark序列化入門
什麼是序列化和序列化?
- 序列化是什麼
1. 序列化的作用就是可以將物件的內容變成二進位制, 存入檔案中儲存
2. 反序列化指的是將儲存下來的二進位制物件資料恢復成物件 - 序列化對物件的要求
1. 物件必須實現Serializable
介面
2. 物件中的所有屬性必須都要可以被序列化, 如果出現無法被序列化的屬性, 則序列化失敗 - 限制
1. 物件被序列化後, 生成的二進位制檔案中, 包含了很多環境資訊, 如物件頭, 物件中的屬性欄位等, 所以內容相對較大
2. 因為資料量大, 所以序列化和反序列化的過程比較慢 - 序列化的應用場景
1. 持久化物件資料
2. 網路中不能傳輸Java
在 Spark
中的序列化和反序列化的應用場景
Task
分發
Task
是一個物件, 想在網路中傳輸物件就必須要先序列化
-
RDD
快取val rdd1 = rdd.flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) rdd1.cache rdd1.collect
RDD
中處理的是物件, 例如說字串,Person
物件等- 如果快取
RDD
中的資料, 就需要快取這些物件 - 物件是不能存在檔案中的, 必須要將物件序列化後, 將二進位制資料存入檔案
-
廣播變數
-
廣播變數會分發到不同的機器上, 這個過程中需要使用網路, 物件在網路中傳輸就必須先被序列化
-
Shuffle
過程
-
Shuffle
過程是由Reducer
從Mapper
中拉取資料, 這裡面涉及到兩個需要序列化物件的原因RDD
中的資料物件需要在Mapper
端落盤快取, 等待拉取Mapper
和Reducer
要傳輸資料物件
-
Spark Streaming
的Receiver
-
Spark Streaming
中獲取資料的元件叫做Receiver
, 獲取到的資料也是物件形式, 在獲取到以後需要落盤暫存, 就需要對資料物件進行序列化 -
運算元引用外部物件
class
- 在
Map
運算元的函式中, 傳入了一個Unserializable
的物件 Map
運算元的函式是會在整個叢集中執行的, 那Unserializable
物件就需要跟隨Map
運算元的函式被傳輸到不同的節點上- 如果
Unserializable
不能被序列化, 則會報錯
- 在
RDD 的序列化
-
RDD
的序列化RDD 的序列化只能使用 Java 序列化器, 或者 Kryo 序列化器
-
為什麼?
RDD 中存放的是資料物件, 要保留所有的資料就必須要對物件的元資訊進行儲存, 例如物件頭之類的 儲存一整個物件, 記憶體佔用和效率會比較低一些
-
Kryo
是什麼
Kryo
是Spark
引入的一個外部的序列化工具, 可以增快RDD
的執行速度 因為Kryo
序列化後的物件更小, 序列化和反序列化的速度非常快 在RDD
中使用Kryo
的過程如下
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("KyroTest")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[Person]))
val sc = new SparkContext(conf)
rdd.map(arr => Person(arr(0), arr(1), arr(2)))
DataFrame
和 Dataset
中的序列化
歷史的問題
RDD
中無法感知資料的組成, 無法感知資料結構, 只能以物件的形式處理資料
DataFrame
和 Dataset
的特點
-
DataFrame
和Dataset
是為結構化資料優化的 -
在
DataFrame
和Dataset
中, 資料和資料的Schema
是分開儲存的spark.read .csv("...") .where($"name" =!= "") .groupBy($"name") .map(row: Row => row) .show()
-
DataFrame
中沒有資料物件這個概念, 所有的資料都以行的形式存在於Row
物件中,Row
中記錄了每行資料的結構, 包括列名, 型別等
Dataset
中上層可以提供有型別的 API
, 用以操作資料, 但是在內部, 無論是什麼型別的資料物件 Dataset
都使用一個叫做 InternalRow
的型別的物件儲存資料
val dataset: Dataset[Person] = spark.read.csv(...).as[Person]
總結
- 當需要將物件快取下來的時候, 或者在網路中傳輸的時候, 要把物件轉成二進位制, 在使用的時候再將二進位制轉為物件, 這個過程叫做序列化和反序列化
- 在
Spark
中有很多場景需要儲存物件, 或者在網路中傳輸物件Task
分發的時候, 需要將任務序列化, 分發到不同的Executor
中執行- 快取
RDD
的時候, 需要儲存RDD
中的資料 - 廣播變數的時候, 需要將變數序列化, 在叢集中廣播
RDD
的Shuffle
過程中Map
和Reducer
之間需要交換資料- 運算元中如果引入了外部的變數, 這個外部的變數也需要被序列化
RDD
因為不保留資料的元資訊, 所以必須要序列化整個物件, 常見的方式是Java
的序列化器, 和Kyro
序列化器Dataset
和DataFrame
中保留資料的元資訊, 所以可以不再使用Java
的序列化器和Kyro
序列化器, 使用Spark
特有的序列化協議, 生成UnsafeInternalRow
用以儲存資料, 這樣不僅能減少資料量, 也能減少序列化和反序列化的開銷, 其速度大概能達到RDD
的序列化的20
倍左右