1. 程式人生 > 其它 >Spark序列化入門

Spark序列化入門

技術標籤:sparkspark序列化rdd

什麼是序列化和序列化?

  • 序列化是什麼
    1. 序列化的作用就是可以將物件的內容變成二進位制, 存入檔案中儲存
    2. 反序列化指的是將儲存下來的二進位制物件資料恢復成物件
  • 序列化對物件的要求
    1. 物件必須實現 Serializable 介面
    2. 物件中的所有屬性必須都要可以被序列化, 如果出現無法被序列化的屬性, 則序列化失敗
  • 限制
    1. 物件被序列化後, 生成的二進位制檔案中, 包含了很多環境資訊, 如物件頭, 物件中的屬性欄位等, 所以內容相對較大
    2. 因為資料量大, 所以序列化和反序列化的過程比較慢
  • 序列化的應用場景
    1. 持久化物件資料
    2. 網路中不能傳輸 Java
    物件, 只能將其序列化後傳輸二進位制資料

Spark 中的序列化和反序列化的應用場景

  • Task 分發

在這裡插入圖片描述

Task 是一個物件, 想在網路中傳輸物件就必須要先序列化

  • RDD 快取

    val rdd1 = rdd.flatMap(_.split(" "))
       .map((_, 1))
       .reduceByKey(_ + _)
    rdd1.cache
    rdd1.collect
    
    • RDD 中處理的是物件, 例如說字串, Person 物件等
    • 如果快取 RDD 中的資料, 就需要快取這些物件
    • 物件是不能存在檔案中的, 必須要將物件序列化後, 將二進位制資料存入檔案
  • 廣播變數

在這裡插入圖片描述

  • 廣播變數會分發到不同的機器上, 這個過程中需要使用網路, 物件在網路中傳輸就必須先被序列化

  • Shuffle 過程

在這裡插入圖片描述

  • Shuffle 過程是由 ReducerMapper 中拉取資料, 這裡面涉及到兩個需要序列化物件的原因

    • RDD 中的資料物件需要在 Mapper 端落盤快取, 等待拉取
    • MapperReducer 要傳輸資料物件
  • Spark StreamingReceiver

在這裡插入圖片描述

  • Spark Streaming 中獲取資料的元件叫做 Receiver, 獲取到的資料也是物件形式, 在獲取到以後需要落盤暫存, 就需要對資料物件進行序列化

  • 運算元引用外部物件

    class
    userserializable(i: Int) rdd.map(i => new Unserializable(i)) .collect .foreach(println)
    • Map 運算元的函式中, 傳入了一個 Unserializable 的物件
    • Map 運算元的函式是會在整個叢集中執行的, 那 Unserializable 物件就需要跟隨 Map 運算元的函式被傳輸到不同的節點上
    • 如果 Unserializable 不能被序列化, 則會報錯

RDD 的序列化

在這裡插入圖片描述

  • RDD 的序列化

    RDD 的序列化只能使用 Java 序列化器, 或者 Kryo 序列化器

  • 為什麼?

    RDD 中存放的是資料物件, 要保留所有的資料就必須要對物件的元資訊進行儲存, 例如物件頭之類的 儲存一整個物件, 記憶體佔用和效率會比較低一些

  • Kryo 是什麼

    KryoSpark 引入的一個外部的序列化工具, 可以增快 RDD 的執行速度 因為 Kryo 序列化後的物件更小, 序列化和反序列化的速度非常快 在 RDD 中使用 Kryo 的過程如下

  val conf = new SparkConf()
    .setMaster("local[2]")
    .setAppName("KyroTest")
  
  conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  conf.registerKryoClasses(Array(classOf[Person]))
  
  val sc = new SparkContext(conf)
  
  rdd.map(arr => Person(arr(0), arr(1), arr(2)))

DataFrameDataset 中的序列化

歷史的問題

RDD 中無法感知資料的組成, 無法感知資料結構, 只能以物件的形式處理資料

DataFrameDataset 的特點

  • DataFrameDataset 是為結構化資料優化的

  • DataFrameDataset 中, 資料和資料的 Schema 是分開儲存的

    spark.read
         .csv("...")
         .where($"name" =!= "")
         .groupBy($"name")
         .map(row: Row => row)
         .show()
    
  • DataFrame 中沒有資料物件這個概念, 所有的資料都以行的形式存在於 Row 物件中, Row 中記錄了每行資料的結構, 包括列名, 型別等
    在這裡插入圖片描述

Dataset 中上層可以提供有型別的 API, 用以操作資料, 但是在內部, 無論是什麼型別的資料物件 Dataset 都使用一個叫做 InternalRow 的型別的物件儲存資料

val dataset: Dataset[Person] = spark.read.csv(...).as[Person]

總結

  1. 當需要將物件快取下來的時候, 或者在網路中傳輸的時候, 要把物件轉成二進位制, 在使用的時候再將二進位制轉為物件, 這個過程叫做序列化和反序列化
  2. Spark 中有很多場景需要儲存物件, 或者在網路中傳輸物件
    1. Task 分發的時候, 需要將任務序列化, 分發到不同的 Executor 中執行
    2. 快取 RDD 的時候, 需要儲存 RDD 中的資料
    3. 廣播變數的時候, 需要將變數序列化, 在叢集中廣播
    4. RDDShuffle 過程中 MapReducer 之間需要交換資料
    5. 運算元中如果引入了外部的變數, 這個外部的變數也需要被序列化
  3. RDD 因為不保留資料的元資訊, 所以必須要序列化整個物件, 常見的方式是 Java 的序列化器, 和 Kyro 序列化器
  4. DatasetDataFrame 中保留資料的元資訊, 所以可以不再使用 Java 的序列化器和 Kyro 序列化器, 使用 Spark 特有的序列化協議, 生成 UnsafeInternalRow 用以儲存資料, 這樣不僅能減少資料量, 也能減少序列化和反序列化的開銷, 其速度大概能達到 RDD 的序列化的 20 倍左右