1. 程式人生 > 實用技巧 >Spark核心解析——Spark 記憶體管理

Spark核心解析——Spark 記憶體管理

Spark 記憶體管理

在執行Spark 的應用程式時,Spark 叢集會啟動 Driver 和 Executor 兩種 JVM 程序,前者為主控程序,負責建立 Spark 上下文,提交 Spark 作業(Job),並將作業轉化為計算任務(Task),在各個 Executor 程序間協調任務的排程,後者負責在工作節點上執行具體的計算任務,並將結果返回給 Driver,同時為需要持久化的 RDD 提供儲存功能。由於 Driver 的記憶體管理相對來說較為簡單,本節主要對 Executor 的記憶體管理進行分析,下文中的 Spark 記憶體均特指 Executor 的記憶體。

6.1 堆內和堆外記憶體規劃

作為一個 JVM 程序,Executor 的記憶體管理建立在 JVM 的記憶體管理之上,Spark 對 JVM 的堆內(On-heap)空間進行了更為詳細的分配,以充分利用記憶體。同時,Spark 引入了堆外(Off-heap)記憶體,使之可以直接在工作節點的系統記憶體中開闢空間,進一步優化了記憶體的使用。

堆內記憶體受到JVM統一管理,堆外記憶體是直接向作業系統進行記憶體的申請和釋放。

堆內記憶體

堆內記憶體的大小,由 Spark 應用程式啟動時的 –executor-memory 或 spark.executor.memory 引數配置。Executor 內執行的併發任務共享 JVM 堆內記憶體,這些任務在快取 RDD 資料和廣播(Broadcast)資料時佔用的記憶體被規劃為儲存(Storage)記憶體,而這些任務在執行 Shuffle 時佔用的記憶體被規劃為執行(Execution)記憶體,剩餘的部分不做特殊規劃,那些 Spark 內部的物件例項,或者使用者定義的 Spark 應用程式中的物件例項,均佔用剩餘的空間。不同的管理模式下,這三部分佔用的空間大小各不相同。

Spark 對堆內記憶體的管理是一種邏輯上的”規劃式”的管理,因為物件例項佔用記憶體的申請和釋放都由 JVM 完成,Spark 只能在申請後和釋放前記錄這些記憶體,我們來看其具體流程:

申請記憶體流程如下:

lSpark 在程式碼中 new 一個物件例項;

JVM 從堆內記憶體分配空間,建立物件並返回物件引用;

Spark 儲存該物件的引用,記錄該物件佔用的記憶體。

釋放記憶體流程如下:

Spark記錄該物件釋放的記憶體,刪除該物件的引用;

等待JVM的垃圾回收機制釋放該物件佔用的堆內記憶體。

我們知道,JVM 的物件可以以序列化的方式儲存,序列化的過程是將物件轉換為二進位制位元組流,本質上可以理解為將非連續空間的鏈式儲存轉化為連續空間或塊儲存,在訪問時則需要進行序列化的逆過程——反序列化,將位元組流轉化為物件,序列化的方式可以節省儲存空間,但增加了儲存和讀取時候的計算開銷。

對於 Spark 中序列化的物件,由於是位元組流的形式,其佔用的記憶體大小可直接計算,而對於非序列化的物件,其佔用的記憶體是通過週期性地取樣近似估算而得,即並不是每次新增的資料項都會計算一次佔用的記憶體大小,這種方法降低了時間開銷但是有可能誤差較大,導致某一時刻的實際記憶體有可能遠遠超出預期。此外,在被 Spark 標記為釋放的物件例項,很有可能在實際上並沒有被 JVM 回收,導致實際可用的記憶體小於 Spark 記錄的可用記憶體。所以 Spark 並不能準確記錄實際可用的堆內記憶體,從而也就無法完全避免記憶體溢位(OOM, Out of Memory)的異常。

雖然不能精準控制堆內記憶體的申請和釋放,但 Spark 通過對儲存記憶體和執行記憶體各自獨立的規劃管理,可以決定是否要在儲存記憶體裡快取新的 RDD,以及是否為新的任務分配執行記憶體,在一定程度上可以提升記憶體的利用率,減少異常的出現。

堆外記憶體

為了進一步優化記憶體的使用以及提高 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)記憶體,使之可以直接在工作節點的系統記憶體中開闢空間,儲存經過序列化的二進位制資料。

堆外記憶體意味著把記憶體物件分配在Java虛擬機器的堆以外的記憶體,這些記憶體直接受作業系統管理(而不是虛擬機器)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。

利用 JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的儲存記憶體時不再基於 Tachyon,而是與堆外的執行記憶體一樣,基於 JDK Unsafe API 實現),Spark 可以直接作業系統堆外記憶體,減少了不必要的記憶體開銷,以及頻繁的 GC 掃描和回收,提升了處理效能。堆外記憶體可以被精確地申請和釋放(堆外記憶體之所以能夠被精確的申請和釋放,是由於記憶體的申請和釋放不再通過JVM機制,而是直接向作業系統申請,JVM對於記憶體的清理是無法準確指定時間點的,因此無法實現精確的釋放),而且序列化的資料佔用的空間可以被精確計算,所以相比堆內記憶體來說降低了管理的難度,也降低了誤差。

在預設情況下堆外記憶體並不啟用,可通過配置 spark.memory.offHeap.enabled 引數啟用,並由 spark.memory.offHeap.size 引數設定堆外空間的大小。除了沒有 other 空間,堆外記憶體與堆內記憶體的劃分方式相同,所有執行中的併發任務共享儲存記憶體和執行記憶體。

(*該部分記憶體主要用於程式的共享庫、Perm Space、執行緒Stack和一些Memory mapping等, 或者類C方式allocate object)

6.2 記憶體空間分配

靜態記憶體管理

在 Spark 最初採用的靜態記憶體管理機制下,儲存記憶體、執行記憶體和其他記憶體的大小在 Spark 應用程式執行期間均為固定的,但使用者可以應用程式啟動前進行配置,堆內記憶體的分配如圖 2 所示:

可以看到,可用的堆內記憶體的大小需要按照程式碼清單1-1的方式計算:

可用的儲存記憶體 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction
可用的執行記憶體 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction

 

其中 systemMaxMemory 取決於當前 JVM 堆內記憶體的大小,最後可用的執行記憶體或者儲存記憶體要在此基礎上與各自的 memoryFraction 引數和 safetyFraction 引數相乘得出。上述計算公式中的兩個 safetyFraction 引數,其意義在於在邏輯上預留出 1-safetyFraction 這麼一塊保險區域,降低因實際記憶體超出當前預設範圍而導致 OOM 的風險(上文提到,對於非序列化物件的記憶體取樣估算會產生誤差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,在具體使用時 Spark 並沒有區別對待,和”其它記憶體”一樣交給了 JVM 去管理。

Storage記憶體和Execution記憶體都有預留空間,目的是防止OOM,因為Spark堆內記憶體大小的記錄是不準確的,需要留出保險區域。

堆外的空間分配較為簡單,只有儲存記憶體和執行記憶體,如圖1-3所示。可用的執行記憶體和儲存記憶體佔用的空間大小直接由引數spark.memory.storageFraction 決定,由於堆外記憶體佔用的空間可以被精確計算,所以無需再設定保險區域。

 

靜態記憶體管理機制實現起來較為簡單,但如果使用者不熟悉 Spark 的儲存機制,或沒有根據具體的資料規模和計算任務或做相應的配置,很容易造成”一半海水,一半火焰”的局面,即儲存記憶體和執行記憶體中的一方剩餘大量的空間,而另一方卻早早被佔滿,不得不淘汰或移出舊的內容以儲存新的內容。由於新的記憶體管理機制的出現,這種方式目前已經很少有開發者使用,出於相容舊版本的應用程式的目的,Spark 仍然保留了它的實現。

統一記憶體管理

Spark 1.6 之後引入的統一記憶體管理機制,與靜態記憶體管理的區別在於儲存記憶體和執行記憶體共享同一塊空間,可以動態佔用對方的空閒區域,統一記憶體管理的堆內記憶體結構如圖 1-4所示:

統一記憶體管理的堆外記憶體結構如圖 1-5所示:

其中最重要的優化在於動態佔用機制,其規則如下:

設定基本的儲存記憶體和執行記憶體區域(spark.storage.storageFraction 引數),該設定確定了雙方各自擁有的空間的範圍;

雙方的空間都不足時,則儲存到硬碟;若己方空間不足而對方空餘時,可借用對方的空間;(儲存空間不足是指不足以放下一個完整的 Block)

執行記憶體的空間被對方佔用後,可讓對方將佔用的部分轉存到硬碟,然後”歸還”借用的空間;

儲存記憶體的空間被對方佔用後,無法讓對方”歸還”,因為需要考慮 Shuffle 過程中的很多因素,實現起來較為複雜。

統一記憶體管理的動態佔用機制如圖 1-6所示:

憑藉統一記憶體管理機制,Spark 在一定程度上提高了堆內和堆外記憶體資源的利用率,降低了開發者維護 Spark 記憶體的難度,但並不意味著開發者可以高枕無憂。如果儲存記憶體的空間太大或者說快取的資料過多,反而會導致頻繁的全量垃圾回收,降低任務執行時的效能,因為快取的 RDD 資料通常都是長期駐留記憶體的。所以要想充分發揮 Spark 的效能,需要開發者進一步瞭解儲存記憶體和執行記憶體各自的管理方式和實現原理。

6.3 儲存記憶體管理

RDD的持久化機制

彈性分散式資料集(RDD)作為 Spark 最根本的資料抽象,是隻讀的分割槽記錄(Partition)的集合,只能基於在穩定物理儲存中的資料集上建立,或者在其他已有的 RDD 上執行轉換(Transformation)操作產生一個新的 RDD。轉換後的 RDD 與原始的 RDD 之間產生的依賴關係,構成了血統(Lineage)。憑藉血統,Spark 保證了每一個 RDD 都可以被重新恢復。但 RDD 的所有轉換都是惰性的,即只有當一個返回結果給 Driver 的行動(Action)發生時,Spark 才會建立任務讀取 RDD,然後真正觸發轉換的執行。

Task 在啟動之初讀取一個分割槽時,會先判斷這個分割槽是否已經被持久化,如果沒有則需要檢查 Checkpoint 或按照血統重新計算。所以如果一個 RDD 上要執行多次行動,可以在第一次行動中使用 persist 或 cache 方法,在記憶體或磁碟中持久化或快取這個 RDD,從而在後面的行動時提升計算速度。

事實上,cache 方法是使用預設的 MEMORY_ONLY 的儲存級別將 RDD 持久化到記憶體,故快取是一種特殊的持久化。 堆內和堆外儲存記憶體的設計,便可以對快取 RDD 時使用的記憶體做統一的規劃和管理

RDD 的持久化由 Spark 的 Storage 模組負責,實現了 RDD 與物理儲存的解耦合。Storage 模組負責管理 Spark 在計算過程中產生的資料,將那些在記憶體或磁碟、在本地或遠端存取資料的功能封裝了起來。在具體實現時 Driver 端和 Executor 端的 Storage 模組構成了主從式的架構,即 Driver 端的 BlockManager 為 Master,Executor 端的 BlockManager 為 Slave。

Storage 模組在邏輯上以 Block 為基本儲存單位,RDD 的每個 Partition 經過處理後唯一對應一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID )。Driver端的Master 負責整個 Spark 應用程式的 Block 的元資料資訊的管理和維護,而Executor端的 Slave 需要將 Block 的更新等狀態上報到 Master,同時接收 Master 的命令,例如新增或刪除一個 RDD。

在對 RDD 持久化時,Spark 規定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 種不同的儲存級別 ,而儲存級別是以下 5 個變數的組合:

class StorageLevel private(

private var _useDisk: Boolean, //磁碟

private var _useMemory: Boolean, //這裡其實是指堆內記憶體

private var _useOffHeap: Boolean, //堆外記憶體

 private var _deserialized: Boolean, //是否為非序列化

private var _replication: Int = 1 //副本個數

l   )

Spark中7種儲存級別如下:

持久化級別

含義

MEMORY_ONLY

以非序列化的Java物件的方式持久化在JVM記憶體中。如果記憶體無法完全儲存RDD所有的partition,那麼那些沒有持久化的partition就會在下一次需要使用它們的時候,重新被計算

MEMORY_AND_DISK

同上,但是當某些partition無法儲存在記憶體中時,會持久化到磁碟中。下次需要使用這些partition時,需要從磁碟上讀取

MEMORY_ONLY_SER

同MEMORY_ONLY,但是會使用Java序列化方式,將Java物件序列化後進行持久化。可以減少記憶體開銷,但是需要進行反序列化,因此會加大CPU開銷

MEMORY_AND_DISK_SER

同MEMORY_AND_DISK,但是使用序列化方式持久化Java物件

DISK_ONLY

使用非序列化Java物件的方式持久化,完全儲存到磁碟上

MEMORY_ONLY_2

MEMORY_AND_DISK_2

等等

如果是尾部加了2的持久化級別,表示將持久化資料複用一份,儲存到其他節點,從而在資料丟失時,不需要再次計算,只需要使用備份資料即可

通過對資料結構的分析,可以看出儲存級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的儲存方式:

儲存位置:磁碟/堆內記憶體/堆外記憶體。如 MEMORY_AND_DISK 是同時在磁碟和堆內記憶體上儲存,實現了冗餘備份。OFF_HEAP 則是隻在堆外記憶體儲存,目前選擇堆外記憶體時不能同時儲存到其他位置。

儲存形式:Block 快取到儲存記憶體後,是否為非序列化的形式。如 MEMORY_ONLY 是非序列化方式儲存,OFF_HEAP 是序列化方式儲存。

副本數量:大於 1 時需要遠端冗餘備份到其他節點。如 DISK_ONLY_2 需要遠端備份 1 個副本。

RDD的快取過程

RDD 在快取到儲存記憶體之前,Partition 中的資料一般以迭代器(Iterator)的資料結構來訪問,這是 Scala 語言中一種遍歷資料集合的方法。通過 Iterator 可以獲取分割槽中每一條序列化或者非序列化的資料項(Record),這些 Record 的物件例項在邏輯上佔用了 JVM 堆內記憶體的 other 部分的空間,同一 Partition 的不同 Record 的儲存空間並不連續

RDD 在快取到儲存記憶體之後,Partition 被轉換成 Block,Record 在堆內或堆外儲存記憶體中佔用一塊連續的空間。Partition由不連續的儲存空間轉換為連續儲存空間的過程,Spark稱之為"展開"Unroll

Block 有序列化和非序列化兩種儲存格式,具體以哪種方式取決於該 RDD 的儲存級別。非序列化的 Block 以一種 DeserializedMemoryEntry 的資料結構定義,用一個數組儲存所有的物件例項,序列化的 Block 則以 SerializedMemoryEntry的資料結構定義,用位元組緩衝區(ByteBuffer)來儲存二進位制資料。每個 Executor 的 Storage 模組用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外儲存記憶體中所有的 Block 物件的例項,對這個 LinkedHashMap 新增和刪除間接記錄了記憶體的申請和釋放。

因為不能保證儲存空間可以一次容納 Iterator 中的所有資料,當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時佔位,空間不足則 Unroll 失敗,空間足夠時可以繼續進行

對於序列化的 Partition,其所需的 Unroll 空間可以直接累加計算,一次申請。

對於非序列化的 Partition 則要在遍歷 Record 的過程中依次申請,即每讀取一條 Record,取樣估算其所需的 Unroll 空間並進行申請,空間不足時可以中斷,釋放已佔用的 Unroll 空間。

如果最終 Unroll 成功,當前 Partition 所佔用的 Unroll 空間被轉換為正常的快取 RDD 的儲存空間,如下圖所示。

在靜態記憶體管理時,Spark 在儲存記憶體中專門劃分了一塊 Unroll 空間,其大小是固定的,統一記憶體管理時則沒有對 Unroll 空間進行特別區分,當儲存空間不足時會根據動態佔用機制進行處理。

淘汰與落盤

由於同一個 Executor 的所有的計算任務共享有限的儲存記憶體空間,當有新的 Block 需要快取但是剩餘空間不足且無法動態佔用時,就要對 LinkedHashMap 中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block 如果其儲存級別中同時包含儲存到磁碟的要求,則要對其進行落盤(Drop),否則直接刪除該 Block

儲存記憶體的淘汰規則為:

被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬於堆外或堆內記憶體;

新舊 Block 不能屬於同一個 RDD,避免迴圈淘汰;

舊 Block 所屬 RDD 不能處於被讀狀態,避免引發一致性問題;

遍歷 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。

落盤的流程則比較簡單,如果其儲存級別符合_useDisk 為 true 的條件,再根據其_deserialized 判斷是否是非序列化的形式,若是則對其進行序列化,最後將資料儲存到磁碟,在 Storage 模組中更新其資訊。

6.4 執行記憶體管理

執行記憶體主要用來儲存任務在執行 Shuffle 時佔用的記憶體,Shuffle 是按照一定規則對 RDD 資料重新分割槽的過程,我們來看 Shuffle 的 Write 和 Read 兩階段對執行記憶體的使用:

Shuffle Write

若在 map 端選擇普通的排序方式,會採用 ExternalSorter 進行外排,在記憶體中儲存資料時主要佔用堆內執行空間。

若在 map 端選擇 Tungsten 的排序方式,則採用 ShuffleExternalSorter 直接對以序列化形式儲存的資料排序,在記憶體中儲存資料時可以佔用堆外或堆內執行空間,取決於使用者是否開啟了堆外記憶體以及堆外執行記憶體是否足夠。

Shuffle Read

在對 reduce 端的資料進行聚合時,要將資料交給 Aggregator 處理,在記憶體中儲存資料時佔用堆內執行空間。

如果需要進行最終結果排序,則要將再次將資料交給 ExternalSorter 處理,佔用堆內執行空間。

在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的雜湊表在堆內執行記憶體中儲存資料,但在 Shuffle 過程中所有資料並不能都儲存到該雜湊表中,當這個雜湊表佔用的記憶體會進行週期性地取樣估算,當其大到一定程度,無法再從 MemoryManager 申請到新的執行記憶體時,Spark 就會將其全部內容儲存到磁碟檔案中,這個過程被稱為溢存(Spill),溢存到磁碟的檔案最後會被歸併(Merge)。

Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對 Spark 優化記憶體和 CPU 使用的計劃(鎢絲計劃),解決了一些 JVM 在效能上的限制和弊端。Spark 會根據 Shuffle 的情況來自動選擇是否採用 Tungsten 排序。

Tungsten 採用的頁式記憶體管理機制建立在 MemoryManager 之上,即 Tungsten 對執行記憶體的使用進行了一步的抽象,這樣在 Shuffle 過程中無需關心資料具體儲存在堆內還是堆外。

每個記憶體頁用一個 MemoryBlock 來定義,並用 Object obj 和 long offset 這兩個變數統一標識一個記憶體頁在系統記憶體中的地址。

堆內的 MemoryBlock 是以 long 型陣列的形式分配的記憶體,其 obj 的值為是這個陣列的物件引用,offset 是 long 型陣列的在 JVM 中的初始偏移地址,兩者配合使用可以定位這個陣列在堆內的絕對地址;堆外的 MemoryBlock 是直接申請到的記憶體塊,其 obj 為 null,offset 是這個記憶體塊在系統記憶體中的 64 位絕對地址。Spark MemoryBlock 巧妙地將堆內和堆外記憶體頁統一抽象封裝,並用頁表(pageTable)管理每個 Task 申請到的記憶體頁

Tungsten 頁式管理下的所有記憶體用 64 位的邏輯地址表示,由頁號和頁內偏移量組成:

頁號:佔 13 位,唯一標識一個記憶體頁,Spark 在申請記憶體頁之前要先申請空閒頁號。

頁內偏移量:佔 51 位,是在使用記憶體頁儲存資料時,資料在頁內的偏移地址。

有了統一的定址方式,Spark 可以用 64 位邏輯地址的指標定位到堆內或堆外的記憶體,整個 Shuffle Write 排序的過程只需要對指標進行排序,並且無需反序列化,整個過程非常高效,對於記憶體訪問效率和 CPU 使用效率帶來了明顯的提升。

Spark 的儲存記憶體和執行記憶體有著截然不同的管理方式:對於儲存記憶體來說,Spark 用一個 LinkedHashMap 來集中管理所有的 Block,Block 由需要快取的 RDD 的 Partition 轉化而成;而對於執行記憶體,Spark 用 AppendOnlyMap 來儲存 Shuffle 過程中的資料,在 Tungsten 排序中甚至抽象成為頁式記憶體管理,開闢了全新的 JVM 記憶體管理機制。