1. 程式人生 > >spark2.x-記憶體管理機制

spark2.x-記憶體管理機制

這裡寫圖片描述

MemoryManager

管理在jvm內部的spark整體的記憶體使用,該元件實現了將可用記憶體按任務劃分的策略。在記憶體(記憶體使用快取和資料傳輸)和執行之間分配記憶體(計算所使用的記憶體,如shuffles、joins、sorts和aggregations)。

執行記憶體指的是計算shuffles、joins、sorts和aggregations,而儲存記憶體指的是用於快取和傳播跨叢集的內部資料。每個JVM存在一個MemoryManager。

強制管理儲存(Storage)和執行(Execution)之間的記憶體使用,從 MemoryManager 申請可以把剩餘空間借給對方。所有 Task 的執行就是 ShuffleTask 的執行,ExecutionMemory是指 Shuffles,joins,sorts 和 aggregation 的操作;而 StorageMemory 是快取和廣播資料相關的,每一個 JVM 會產生一個 MemoryManager 來負責管理記憶體。MemoryManager 構造時,需要指定 onHeapStorageMemeory和 onHeapExecutionMemory 的引數。

在 MemoryManager 物件構造的時候建立 StorageMemoryPool 和 ExecutionMemoryPool 物件,用來管理了 Storage 和 Execution 的記憶體分配。

這裡是 StorageMemory 用來記錄 Storage 使用了多少記憶體
[下圖是 StorageMemoryPool.scala 中 memoryUsed 方法]

MemoryStore 也是被 BlockManager 管理的,以下是其中一個 MemoryStore 呼叫 acquireStorageMemory 方法的原始碼

這裡是 ExecutionMemory 用來記錄 Execution 使用了多少記憶體,它建立一些 HashMap 來儲存每個 Task 的記憶體使用量,把 Map 中的所有 Value 加起來便用當前 ExecutionMemory 的總使用量。

現在 Spark 2.1 預設的 MemoryManager 是 UnifiedMemoryManager,你可以看到下里有一段條件判斷的邏輯,如果 spark.memory.userLegacyMode 是 true 的話,MemeoryManager 便是 StaticMemoryManager,否則的話就是 Spark Unified Memory。

在 MemoryManager 中有一個很關鍵的程式碼,如果你想使用 OffHeap 作為儲存的話,你必需設定 spark.memory.offHeap.enabled 為 true,還有確定你的 offHeap 系統的空間必須大於 0。

MemoryConsumer
TaskMemoryManager對應於單個操作符和任務內的資料結構的客戶機。
TaskMemoryManager從記憶體消耗者接收記憶體分配請求並向消費者發出回撥為了在記憶體低執行時觸發溢位操作。

TaskMemoryManager
管理單獨每個task的記憶體分配。任務與TaskMemoryManager互動,而不直接與JVM記憶體管理器互動。

在內部,這些元件中的每一個都可用於儲存記賬:

MemoryPool
是一個記錄抽象 由記憶體管理器去跟蹤在儲存和執行之間記憶體的劃分

StorageMemoryPool
Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
* (caching).

ExecutionMemoryPool

用於在tasks之間分享可調節大小的記憶體池
確保每個task獲取一個合理的記憶體分配,而先獲取大量記憶體 然後迫使其他task重複溢位到磁碟上
如果有N個tasks ,確保每個任務能獲取至少記憶體的1/2N在它溢位到磁碟之前,最多1 / N,因為N是自動變化的
我們保持每個任務的集合 然後重新計算1/2N 和1/N 在等待物理這個集合改變。
這個是通過同步進入可變狀態 使用wait和notifyall 去通知呼叫者實現的
spark1.6以後,跨task之間記憶體的使用 由shuffleMemoryManager決定

MemoryManager有兩種動態處理memory pools大小的實現模型

StaticMemoryManager
強制一個硬性的邊界在儲存和執行記憶體中 通過靜態給spark記憶體分割槽,阻止儲存和執行互相借用記憶體,
這種模式已過時,保留只是為了相容性

  • A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
    *
  • The sizes of the execution and storage regions are determined through
  • spark.shuffle.memoryFraction and spark.storage.memoryFraction respectively. The two
  • regions are cleanly separated such that neither usage can borrow memory from the other.
    */
    這裡寫圖片描述
    這裡寫圖片描述

    UnifiedMemoryManager

Spark 2.0 中推出了聯合記憶體的概念,最主要的改變是儲存和執行的空間可以動態移動。需要注意的是執行比儲存有更大的優先值,當空間不夠時,可以向對方借空間,但前提是對方有足夠的空間或者是 Execution 可以強制把 Storage 一部份空間擠掉。Excution 向 Storage 借空間有兩種方式:第一種方式是 Storage 曾經向 Execution 借了空間,它快取的資料可能是非常的多,當 Execution 需要空間時可以強制拿回來;第二種方式是 Execution Memory 不足 50% 的情況下,Storgae Memory 會很樂意地把剩餘空間借給 Execution。

如果是你的計算比較複雜的情況,使用新型的記憶體管理 (Unified Memory Management) 會取得更好的效率,但是如果說計算的業務邏輯需要更大的快取空間,此時使用老版本的固定記憶體管理 (StaticMemoryManagement) 效果會更好

在spark1.6以上的版本中,預設強制在儲存和執行記憶體中指定一個彈性邊界,允許在一個區域記憶體請求,借用他人的記憶體來填滿

A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
* either side can borrow memory from the other.
*
* The region shared between execution and storage is a fraction of (the total heap space - 300MB)
* configurable through spark.memory.fraction (default 0.6). The position of the boundary
* within this space is further determined by spark.memory.storageFraction (default 0.5).
* This means the size of the storage region is 0.6 * 0.5 = 0.3 of the heap space by default.
*
* Storage can borrow as much execution memory as is free until execution reclaims its space.
* When this happens, cached blocks will be evicted from memory until sufficient borrowed
* memory is released to satisfy the execution memory request.
*
* Similarly, execution can borrow as much storage memory as is free. However, execution
* memory is never evicted by storage due to the complexities involved in implementing this.
* The implication is that attempts to cache blocks may fail if execution has already eaten
* up most of the storage space, in which case the new blocks will be evicted immediately
* according to their respective storage levels.
*
* @param onHeapStorageRegionSize Size of the storage region, in bytes.
* This region is not statically reserved; execution can borrow from
* it if necessary. Cached blocks can be evicted only if actual
* storage memory usage exceeds this region.
資料快取與資料執行之間的記憶體可以相互移動,這是一種更彈性的方式,下圖顯示的是 Spark 2.x 版本對 Java 堆 (heap) 的使用情況,資料處理以及類的實體物件存放在 JVM 堆 (heap) 中
這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

Spark 2.1.0 新型 JVM Heap 分成三個部份:Reserved Memory、User Memory 和 Spark Memor

Spark Memeory:系統框架執行時需要使用的空間,這是從兩部份構成的,分別是 Storage Memeory 和 Execution Memory。現在 Storage 和 Execution (Shuffle) 採用了 Unified 的方式共同使用了 (Heap Size - 300MB) x 75%,預設情況下 Storage 和 Execution 各佔該空間的 50%。你可以從圖中可以看出,Storgae 和 Execution 的儲存空間可以往上和往下移動。
定義:所謂 Unified 的意思是 Storgae 和 Execution 在適當時候可以借用彼此的 Memory,需要注意的是,當 Execution 空間不足而且 Storage 空間也不足的情況下,Storage 空間如果曾經使用了超過 Unified 預設的 50% 空間的話則超過部份會被強制 drop 掉一部份資料來解決 Execution 空間不足的問題 (注意:drop 後資料會不會丟失主要是看你在程式設定的 storage_level 來決定你是 Drop 到那裡,可能 Drop 到磁碟上),這是因為執行(Execution) 比快取 (Storage) 是更重要的事情。

User Memory:寫 Spark 程式中產生的臨時資料或者是自己維護的一些資料結構也需要給予它一部份的儲存空間,你可以這麼認為,這是程式執行時使用者可以主導的空間,叫使用者操作空間。它佔用的空間是 (Java Heap - Reserved Memory) x 25% (預設是25%,可以有引數供調優),這樣設計可以讓使用者操作時所需要的空間與系統框架執行時所需要的空間分離開。假設 Executor 有 4G 的大小,那麼在預設情況下 User Memory 大小是:(4G - 300MB) x 25% = 949MB,也就是說一個 Stage 內部展開後 Task 的運算元在執行時最大的大小不能夠超過 949MB。例如工程師使用 mapPartition 等,一個 Task 內部所有運算元使用的資料空間的大小如果大於 949MB 的話,那麼就會出現 OOM。思考題:有 100個 Executors 每個 4G 大小,現在要處理 100G 的資料,假設這 100G 分配給 100個 Executors,每個 Executor 分配 1G 的資料,這 1G 的資料遠遠少於 4G Executor 記憶體的大小,為什麼還會出現 OOM 的情況呢?那是因為在你的程式碼中(e.g.你寫的應用程式運算元)超過使用者空間的限制 (e.g. 949MB),而不是 RDD 本身的資料超過了限制。

Reserved Memory:預設都是300MB,這個數字一般都是固定不變的,在系統執行的時候 Java Heap 的大小至少為 Heap Reserved Memory x 1.5. e.g. 300MB x 1.5 = 450MB 的 JVM配置。一般本地開發例如說在 Windows 系統上,建義系統至少 2G 的大小。

SparkMemory空間預設是佔可用 HeapSize 的 60%,與上圖顯示的75%有點不同,當然這個引數是可配置的!!

UnifiedMemoryManager 構造時呼叫工廠方法 apply( ),預設是把 Storage空間的50%給 Execution

你可以很清楚的看見:預設的 Reserved System Memory 是 300M,然後預設的 HeapStorageRegionSize 是 MaxMemory x 50%,如果實現了 OffHeapExecutionMemoryPool 你覺得會不會有從 StorageMemory 獲得儲存這個概念? 實際上不需要找 Storage 借空間。如果是 ShuffleTask 計算比較複雜的情況,使用 Unified Memory Management 會取得更好的效率,但是如果說計算的業務邏輯需要更大的快取空間,此時使用 StaticMemoryManagement 效果會更好。
RESERVED_SYSTEM_MEMORY_BYTES 引數和 apply 方法]

Unified 機制下有兩種方法 Execution 會向 Storage 借空間,現在配合原始碼來證明這個說法。
Unified Memory Manager 有兩個核心方法,第一個是 acquiredExecutionMemeory 和 acquireStorageMemory,當 ExecutionMemory 有剩餘空間時可以借給 StorageMemory,然後通過呼叫 StorageMemoryPool 的 acquireMemory 方法向 storageMemoryPool 申請空間。

acquiredExecutionMemory 主要是為當前的執行任務去獲得的執行空間,它首先會根據我們的 onHeap 和 offHeap 這兩種不同的方式來進行配。

在MemoryManager 構造的時候也分配一定的記憶體空間 poolSize

呼叫 computeMaxExecutionPoolSize 方法向 ExecutionPool 申請資源。過程中會呼叫 maybeGrowExecutionPool來判斷需要多少記憶體,包括計算記憶體空間的空閒資源與Storage曾經佔用的空間。

maybeGrowExecutionPool 方法會首先判斷申請的記憶體申請資源是大於0,然後判斷是剩餘空間和 Storage曾經佔用的空間多,把需要的記憶體資源量提交給 StorageMemoryPool 的 freeSpaceToShrinkPool 方法。

然後會判斷是當前 FreeSpace 能不能滿足 Execution 的需要,如果無法滿足則呼叫 MemoryStore的evictVlocksToFreeSpace方法在 StorageMemoryPool 中擠掉一部份資料。

呼叫 ExecutionPool 的 acquireMemory 方法向 ExecutionPool 申請記憶體資源,每個 Task 理論上講一般能使用的大小是從 poolSize /(2 x numActiveTasks) 到 maxPoolSize/numActiveTasks