1. 程式人生 > 實用技巧 >Spark Executor記憶體管理

Spark Executor記憶體管理

http://arganzheng.life/spark-executor-memory-management.html

我們都知道 Spark 能夠有效的利用記憶體並進行分散式計算,其記憶體管理模組在整個系統中扮演著非常重要的角色。為了更好地利用 Spark,深入地理解其記憶體管理模型具有非常重要的意義,這有助於我們對 Spark 進行更好的調優;在出現各種記憶體問題時,能夠摸清頭腦,找到哪塊記憶體區域出現問題。

首先我們知道在執行 Spark 的應用程式時,Spark 叢集會啟動 Driver 和 Executor 兩種 JVM 程序,前者為主控程序,負責建立 Spark 上下文,提交 Spark 作業(Job),並將作業轉化為計算任務(Task),在各個 Executor 程序間協調任務的排程,後者負責在工作節點上執行具體的計算任務,並將結果返回給 Driver,同時為需要持久化的 RDD 提供儲存功能。由於 Driver 的記憶體管理相對來說較為簡單,本文主要對 Executor 的記憶體管理進行分析,下文中的 Spark 記憶體均特指 Executor 的記憶體。

另外,Spark 1.6 之前使用的是靜態記憶體管理 (StaticMemoryManager) 機制,StaticMemoryManager 也是 Spark 1.6 之前唯一的記憶體管理器。在 Spark1.6 之後引入了統一記憶體管理 (UnifiedMemoryManager) 機制,UnifiedMemoryManager 是 Spark 1.6 之後預設的記憶體管理器,1.6 之前採用的靜態管理(StaticMemoryManager)方式仍被保留,可通過配置spark.memory.useLegacyMode引數啟用。這裡僅對統一記憶體管理模組 (UnifiedMemoryManager) 機制進行分析。

Executor記憶體總體佈局

預設情況下,Executor不開啟堆外記憶體,因此整個 Executor 端記憶體佈局如下圖所示:

我們可以看到在Yarn叢集管理模式中,Spark 以 Executor Container 的形式在 NodeManager 中執行,其可使用的記憶體上限由yarn.scheduler.maximum-allocation-mb指定,我們稱之為 MonitorMemory。

整個Executor記憶體區域分為兩塊:

1、JVM堆外記憶體

大小由spark.yarn.executor.memoryOverhead引數指定。預設大小為executorMemory * 0.10

, with minimum of 384m。

此部分記憶體主要用於JVM自身,字串, NIO Buffer(Driect Buffer)等開銷。此部分為使用者程式碼及Spark 不可操作的記憶體,不足時可通過調整引數解決。

The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).

2、堆內記憶體(ExecutorMemory)

大小由 Spark 應用程式啟動時的–executor-memoryspark.executor.memory引數配置,即JVM最大分配的堆記憶體 (-Xmx)。Spark為了更高效的使用這部分記憶體,對這部分記憶體進行了邏輯上的劃分管理。我們在下面的統一記憶體管理會詳細介紹。

NOTES

對於Yarn叢集,存在:ExecutorMemory + MemoryOverhead <= MonitorMemory,若應用提交之時,指定的 ExecutorMemory 與 MemoryOverhead 之和大於 MonitorMemory,則會導致 Executor 申請失敗;若執行過程中,實際使用記憶體超過上限閾值,Executor 程序會被 Yarn 終止掉 (kill)。

統一記憶體管理

Spark 1.6之後引入了統一記憶體管理,包括了堆內記憶體 (On-heap Memory) 和堆外記憶體 (Off-heap Memory) 兩大區域,下面對這兩塊區域進行詳細的說明。

堆內記憶體 (On-heap Memory)

預設情況下,Spark 僅僅使用了堆內記憶體。Spark 對堆內記憶體的管理是一種邏輯上的“規劃式”的管理,Executor 端的堆內記憶體區域在邏輯上被劃分為以下四個區域:

  1. 執行記憶體 (Execution Memory) : 主要用於存放 Shuffle、Join、Sort、Aggregation 等計算過程中的臨時資料;
  2. 儲存記憶體 (Storage Memory) : 主要用於儲存 spark 的 cache 資料,例如RDD的快取、unroll資料;
  3. 使用者記憶體(User Memory): 主要用於儲存 RDD 轉換操作所需要的資料,例如 RDD 依賴等資訊;
  4. 預留記憶體(Reserved Memory): 系統預留記憶體,會用來儲存Spark內部物件。

下面的圖對這個四個記憶體區域的分配比例做了詳細的描述:

1、預留記憶體 (Reserved Memory)

系統預留記憶體,會用來儲存Spark內部物件。其大小在程式碼中是寫死的,其值等於 300MB,這個值是不能修改的(如果在測試環境下,我們可以通過spark.testing.reservedMemory引數進行修改);如果Executor分配的記憶體小於 1.5 * 300 = 450M 時,Executor將無法執行。

2、儲存記憶體 (Storage Memory)

主要用於儲存 spark 的 cache 資料,例如 RDD 的快取、廣播(Broadcast)資料、和 unroll 資料。記憶體佔比為UsableMemory * spark.memory.fraction * spark.memory.storageFraction,Spark 2+ 中,預設初始狀態下 Storage Memory 和 Execution Memory 均約佔系統總記憶體的30%(1 * 0.6 * 0.5 = 0.3)。在 UnifiedMemory 管理中,這兩部分記憶體可以相互借用,具體借用機制我們下一小節會詳細介紹。

3、執行記憶體 (Execution Memory)

主要用於存放 Shuffle、Join、Sort、Aggregation 等計算過程中的臨時資料。記憶體佔比為UsableMemory * spark.memory.fraction * (1 - spark.memory.storageFraction),Spark 2+ 中,預設初始狀態下 Storage Memory 和 Execution Memory 均約佔系統總記憶體的30%(1 * 0.6 * (1 - 0.5) = 0.3)。在 UnifiedMemory 管理中,這兩部分記憶體可以相互借用,具體借用機制我們下一小節會詳細介紹。

4、其他/使用者記憶體 (Other/User Memory) : 主要用於儲存 RDD 轉換操作所需要的資料,例如 RDD 依賴等資訊。記憶體佔比為UsableMemory * (1 - spark.memory.fraction),在Spark2+ 中,預設佔可用記憶體的40%(1 * (1 - 0.6) = 0.4)。

其中,usableMemory = executorMemory - reservedMemory,這個就是 Spark 可用記憶體。

NOTES

1、為什麼設定300M預留記憶體

統一記憶體管理最初版本other這部分記憶體沒有固定值 300M 設定,而是和靜態記憶體管理相似,設定的百分比,最初版本佔 25%。百分比設定在實際使用中出現了問題,若給定的記憶體較低時,例如 1G,會導致 OOM,具體討論參考這裡Make unified memory management work with small heaps。因此,other這部分記憶體做了修改,先劃出 300M 記憶體。

2、spark.memory.fraction由 0.75 降至 0.6

spark.memory.fraction最初版本的值是 0.75,很多分析統一記憶體管理這塊的文章也是這麼介紹的,同樣的,在使用中發現這個值設定的偏高,導致了 gc 時間過長,spark 2.0 版本將其調整為 0.6,詳細談論參見Reduce spark.memory.fraction default to avoid overrunning old gen in JVM default config

堆外記憶體 (Off-heap Memory)

Spark 1.6 開始引入了 Off-heap memory (詳見SPARK-11389)。這種模式不在 JVM 內申請記憶體,而是呼叫 Java 的 unsafe 相關 API 進行諸如 C 語言裡面的malloc()直接向作業系統申請記憶體。這種方式下 Spark 可以直接作業系統堆外記憶體,減少了不必要的記憶體開銷,以及頻繁的 GC 掃描和回收,提升了處理效能。另外,堆外記憶體可以被精確地申請和釋放,而且序列化的資料佔用的空間可以被精確計算,所以相比堆內記憶體來說降低了管理的難度,也降低了誤差。,缺點是必須自己編寫記憶體申請和釋放的邏輯。

預設情況下Off-heap模式的記憶體並不啟用,我們可以通過spark.memory.offHeap.enabled引數開啟,並由spark.memory.offHeap.size指定堆外記憶體的大小,單位是位元組(佔用的空間劃歸 JVM OffHeap 記憶體)。

如果堆外記憶體被啟用,那麼 Executor 內將同時存在堆內和堆外記憶體,兩者的使用互補影響,這個時候 Executor 中的 Execution 記憶體是堆內的 Execution 記憶體和堆外的 Execution 記憶體之和,同理,Storage 記憶體也一樣。其記憶體分佈如下圖所示:

相比堆內記憶體,堆外記憶體只區分 Execution 記憶體和 Storage 記憶體:

1、儲存記憶體 (Storage Memory)

記憶體佔比為maxOffHeapMemory * spark.memory.storageFraction,Spark 2+ 中,預設初始狀態下 Storage Memory 和 Execution Memory 均約佔系統總記憶體的50%(1 * 0.5 = 0.5)。在 UnifiedMemory 管理中,這兩部分記憶體可以相互借用,具體借用機制我們下一小節會詳細介紹。

2、執行記憶體 (Execution Memory)

記憶體佔比為maxOffHeapMemory * (1 - spark.memory.storageFraction),Spark 2+ 中,預設初始狀態下 Storage Memory 和 Execution Memory 均約佔系統總記憶體的50%(1 * (1 - 0.5) = 0.5)。在 UnifiedMemory 管理中,這兩部分記憶體可以相互借用,具體借用機制我們下一小節會詳細介紹。

Execution 記憶體和 Storage 記憶體動態佔用機制

在 Spark 1.5 之前,Execution 記憶體和 Storage 記憶體分配是靜態的,換句話說就是如果 Execution 記憶體不足,即使 Storage 記憶體有很大空閒程式也是無法利用到的;反之亦然。

靜態記憶體管理機制實現起來較為簡單,但如果使用者不熟悉 Spark 的儲存機制,或沒有根據具體的資料規模和計算任務或做相應的配置,很容易造成”一半海水,一半火焰”的局面,即儲存記憶體和執行記憶體中的一方剩餘大量的空間,而另一方卻早早被佔滿,不得不淘汰或移出舊的內容以儲存新的內容。

統一記憶體管理機制,與靜態記憶體管理最大的區別在於儲存記憶體和執行記憶體共享同一塊空間,可以動態佔用對方的空閒區域:

其中最重要的優化在於動態佔用機制,其規則如下:

  • 程式提交的時候我們都會設定基本的 Execution 記憶體和 Storage 記憶體區域(通過spark.memory.storageFraction引數設定)。我們用 onHeapStorageRegionSize 來表示spark.storage.storageFraction劃分的儲存記憶體區域。這部分記憶體是不可以被驅逐(Evict)的儲存記憶體(但是如果空閒是可以被佔用的)。
  • 當計算記憶體不足時,可以借用 onHeapStorageRegionSize 中未使用部分,且 Storage 記憶體的空間被對方佔用後,需要等待執行記憶體自己釋放,不能搶佔。
  • 若實際 StorageMemory 使用量超過 onHeapStorageRegionSize,那麼當計算記憶體不足時,可以驅逐並借用StorageMemory – onHeapStorageRegionSize部分,而 onHeapStorageRegionSize 部分不可被搶佔。
  • 反之,當儲存記憶體不足時(儲存空間不足是指不足以放下一個完整的 Block),也可以借用計算記憶體空間;但是 Execution 記憶體的空間被儲存記憶體佔用後,是可讓對方將佔用的部分轉存到硬碟,然後“歸還”借用的空間。
  • 如果雙方的空間都不足時,則儲存到硬碟;將記憶體中的塊儲存到磁碟的策略是按照 LRU 規則進行的。

說明

1、出於相容舊版本的應用程式的目的,Spark 仍然保留了它的實現。可通過配置spark.memory.useLegacyMode引數啟用。

2、spark.memory.storageFraction是不可被驅逐的記憶體空間。只有空閒的時候能夠被執行記憶體佔用,但是不能被驅逐搶佔。

Amount of storage memory immune to eviction, expressed as a fraction of the size of the region set aside by s​park.memory.fraction. The higher this is, the less working memory may be available to execution and tasks may spill to disk more often. Leaving this at the default value is recommended. For more detail, see this description.

3、Storage 記憶體的空間被對方佔用後,目前的實現是無法讓對方”歸還”,因為需要考慮 Shuffle 過程中的很多因素,實現起來較為複雜;而且 Shuffle 過程產生的檔案在後面一定會被使用到,而 Cache 在記憶體的資料不一定在後面使用。在Unified Memory Management in Spark 1.6中詳細講解了為何選擇這種策略,簡單總結如下:

  1. 資料清除的開銷 : 驅逐storage記憶體的開銷取決於 storage level,MEMORY_ONLY 可能是最昂貴的,因為需要重新計算,MEMORY_AND_DISK_SER 正好相反,只涉及到磁碟IO。溢寫 execution 記憶體到磁碟的開銷並不昂貴,因為 execution 儲存的資料格式緊湊(compact format),序列化開銷低。並且,清除的 storage 記憶體可能不會被用到,但是,可以預見的是,驅逐的 execution 記憶體是必然會再被讀到記憶體的,頻繁的驅除重讀 execution 記憶體將導致昂貴的開銷。
  2. 實現的複雜度 : storage 記憶體的驅逐是容易實現的,只需要使用已有的方法,drop 掉 block。execution 則複雜的多,首先,execution 以 page 為單位管理這部分記憶體,並且確保相應的操作至少有 one page ,如果把這 one page 記憶體驅逐了,對應的操作就會處於飢餓狀態。此外,還需要考慮 execution 記憶體被驅逐的情況下,等待 cache 的 block 如何處理。

4、上面說的借用對方的記憶體需要借用方和被借用方的記憶體型別都一樣,都是堆內記憶體或者都是堆外記憶體,不存在堆內記憶體不夠去借用堆外記憶體的空間。

任務記憶體管理(Task Memory Manager)

Executor 中任務以執行緒的方式執行,各執行緒共享JVM的資源(即 Execution 記憶體),任務之間的記憶體資源沒有強隔離(任務沒有專用的Heap區域)。因此,可能會出現這樣的情況:先到達的任務可能佔用較大的記憶體,而後到的任務因得不到足夠的記憶體而掛起。

在 Spark 任務記憶體管理中,使用 HashMap 儲存任務與其消耗記憶體的對映關係。每個任務可佔用的記憶體大小為潛在可使用計算記憶體( 潛在可使用計算記憶體為: 初始計算記憶體 + 可搶佔儲存記憶體)的 1/2n ~ 1/n,當剩餘記憶體為小於 1/2n 時,任務將被掛起,直至有其他任務釋放執行記憶體,而滿足記憶體下限 1/2n,任務被喚醒。其中 n 為當前 Executor 中活躍的任務樹。

比如如果 Execution 記憶體大小為 10GB,當前 Executor 內正在執行的 Task 個數為5,則該 Task 可以申請的記憶體範圍為 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB 的範圍。

任務執行過程中,如果需要更多的記憶體,則會進行申請,如果存在空閒記憶體,則自動擴容成功,否則,將丟擲 OutOffMemroyError。

每個 Executor 中可同時執行的任務數由 Executor 分配的 CPU 的核數 N 和每個任務需要的 CPU 核心數 C 決定。其中:

N = spark.executor.cores
C = spark.task.cpus

由此每個 Executor 的最大任務並行度可表示為 :TP = N / C

其中,C 值與應用型別有關,大部分應用使用預設值 1 即可,因此,影響 Executor 中最大任務並行度(最大活躍task數)的主要因素是 N。

依據 Task 的記憶體使用特徵,前文所述的 Executor 記憶體模型可以簡單抽象為下圖所示模型:

其中,Executor 向 yarn 申請的總記憶體可表示為 :M = M1 + M2

如果考慮堆外記憶體則大概是如下結構:

一個示例

為了更好的理解上面堆內記憶體和堆外記憶體的使用情況,這裡給出一個簡單的例子。

只用了堆內記憶體

現在我們提交的 Spark 作業關於記憶體的配置如下:--executor-memory 18g

由於沒有設定spark.memory.fractionspark.memory.storageFraction引數,我們可以看到 Spark UI 關於 Storage Memory 的顯示如下:

上圖很清楚地看到 Storage Memory 的可用記憶體是 10.1GB,這個數是咋來的呢?根據前面的規則,我們可以得出以下的計算:

systemMemory = spark.executor.memory
reservedMemory = 300MB
usableMemory = systemMemory - reservedMemory
StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction

如果我們把資料代進去,得出以下的結果:

systemMemory = 18Gb = 19327352832 位元組
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 19327352832 - 314572800 = 19012780032
StorageMemory = usableMemory * spark.memory.fraction * spark.memory.storageFraction
              = 19012780032 * 0.6 * 0.5 = 5703834009.6 = 5.312109375GB

和上面的 10.1GB 對不上啊。為什麼呢?這是因為 Spark UI 上面顯示的 Storage Memory 可用記憶體其實等於 Execution 記憶體和 Storage 記憶體之和,也就是usableMemory * spark.memory.fraction:

StorageMemory = usableMemory * spark.memory.fraction
              = 19012780032 * 0.6 = 11407668019.2 = 10.62421GB

還是不對,這是因為我們雖然設定了--executor-memory 18g,但是 Spark 的 Executor 端通過Runtime.getRuntime.maxMemory拿到的記憶體其實沒這麼大,只有 17179869184 位元組,所以 systemMemory=17179869184,然後計算的資料如下:

systemMemory = 17179869184 位元組
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
StorageMemory= usableMemory * spark.memory.fraction
             = 16865296384 * 0.6 = 9.42421875 GB

我們通過將上面的 16865296384 * 0.6 位元組除於 1024 * 1024 * 1024 轉換成 9.42421875 GB,和 UI 上顯示的還是對不上,這是因為 Spark UI 是通過除於 1000 * 1000 * 1000 將位元組轉換成 GB,如下:

systemMemory = 17179869184 位元組
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
StorageMemory = usableMemory * spark.memory.fraction
              = 16865296384 * 0.6 位元組 =  16865296384 * 0.6 / (1000 * 1000 * 1000) = 10.1GB

現在終於對上了。

具體將位元組轉換成 GB 的計算邏輯如下(core 模組下面的 /core/src/main/resources/org/apache/spark/ui/static/utils.js):

functionformatBytes(bytes, type) {
    if(type !=='display')returnbytes;
    if(bytes == 0)return'0.0 B';
    vark = 1000;
    vardm = 1;
    varsizes = ['B','KB','MB','GB','TB','PB','EB','ZB','YB'];
    vari = Math.floor(Math.log(bytes) / Math.log(k));
    returnparseFloat((bytes / Math.pow(k, i)).toFixed(dm)) +''+ sizes[i];
}

我們設定了--executor-memory 18g,但是 Spark 的 Executor 端通過Runtime.getRuntime.maxMemory拿到的記憶體其實沒這麼大,只有 17179869184 位元組,這個資料是怎麼計算的?

Runtime.getRuntime.maxMemory是程式能夠使用的最大記憶體,其值會比實際配置的執行器記憶體的值小。這是因為記憶體分配池的堆部分劃分為 Eden,Survivor 和 Tenured 三部分空間,而這裡面一共包含了兩個 Survivor 區域,而這兩個 Survivor 區域在任何時候我們只能用到其中一個,所以我們可以使用下面的公式進行描述 :

ExecutorMemory = Eden + 2 * Survivor + Tenured 
Runtime.getRuntime.maxMemory = Eden + Survivor + Tenured 

上面的 17179869184 位元組可能因為你的 GC 配置不一樣得到的資料不一樣,但是上面的計算公式是一樣的。

用了堆內和堆外記憶體

現在如果我們啟用了堆外記憶體,情況會怎樣呢?我們的記憶體相關配置如下:

spark.executor.memory           18g
spark.memory.offHeap.enabled   true
spark.memory.offHeap.size       10737418240

從上面可以看出,堆外記憶體為 10GB,現在 Spark UI 上面顯示的 Storage Memory 可用記憶體為 20.9GB,如下:

其實 Spark UI 上面顯示的 Storage Memory 可用記憶體等於堆內記憶體和堆外記憶體之和,計算公式如下:

堆內:

systemMemory = 17179869184 位元組
reservedMemory = 300MB = 300 * 1024 * 1024 = 314572800
usableMemory = systemMemory - reservedMemory = 17179869184 - 314572800 = 16865296384
totalOnHeapStorageMemory = usableMemory * spark.memory.fraction
                         = 16865296384 * 0.6 = 10119177830

堆外:

totalOffHeapStorageMemory = spark.memory.offHeap.size = 10737418240

總 Storage 記憶體:

StorageMemory = totalOnHeapStorageMemory + totalOffHeapStorageMemory
              = (10119177830 + 10737418240) 位元組
              = (20856596070 / (1000 * 1000 * 1000)) GB
              = 20.9 GB

Executor記憶體引數調優

1. Executor JVM Used Memory Heuristic

現象:配置的executor記憶體比實際使用的JVM最大使用記憶體還要大很多。

原因:這意味著 executor 記憶體申請過多了,實際上並不需要使用這麼多記憶體。

解決方案:將spark.executor.memory設定為一個比較小的值。

例如:

spark.executor.memory : 16 GB
Max executor peak JVM used memory : 6.6 GB

Suggested spark.executor.memory : 7 GB 

2. Executor Unified Memory Heuristic

現象:分配的統一記憶體 (Unified Memory = Storage Memory + Execution Memory) 比 executor 實際使用的統一記憶體大的多。

原因:這意味著不需要這麼大的統一記憶體。

解決方案:降低spark.memory.fraction的比例。

例如:

spark.executor.memory : 10 GB 
spark.memory.fraction : 0.6
Allocated unified memory : 6 GB
Max peak JVM userd memory : 7.2 GB
Max peak unified memory : 1.2 GB

Suggested spark.memory.fraction : 0.2

3. Executor OOM類錯誤 (錯誤程式碼 137、143等)

該類錯誤一般是由於 Heap(M2)已達上限,Task 需要更多的記憶體,而又得不到足夠的記憶體而導致。因此,解決方案要從增加每個 Task 的記憶體使用量,滿足任務需求 或 降低單個 Task 的記憶體消耗量,從而使現有記憶體可以滿足任務執行需求兩個角度出發。因此有如下解決方案:

法一:增加單個task的記憶體使用量

  • 增加最大 Heap值,即上圖中 M2 的值,使每個 Task 可使用記憶體增加。
  • 降低 Executor 的可用 Core 的數量 N , 使 Executor 中同時執行的任務數減少,在總資源不變的情況下,使每個 Task 獲得的記憶體相對增加。當然,這會使得 Executor 的並行度下降。可以通過調高spark.executor.instances引數來申請更多的 executor 例項(或者通過spark.dynamicAllocation.enabled啟動動態分配),提高job的總並行度。

法二: 降低單個Task的記憶體消耗量

降低單個Task的記憶體消耗量可從配置方式和調整應用邏輯兩個層面進行優化:

一、配置方式

減少每個 Task 處理的資料量,可降低 Task 的記憶體開銷,在 Spark 中,每個 partition 對應一個處理任務 Task,因此,在資料總量一定的前提下,可以通過增加 partition 數量的方式來減少每個 Task 處理的資料量,從而降低 Task 的記憶體開銷。針對不同的 Spark 應用型別,存在不同的 partition 配置引數 :

P = spark.default.parallism (非SQL應用)
P = spark.sql.shuffle.partition (SQL 應用)

通過增加 P 的值,可在一定程度上使 Task 現有記憶體滿足任務執行。注: 當調整一個引數不能解決問題時,上述方案應進行協同調整。

二、調整應用邏輯

Executor OOM 一般發生 Shuffle 階段,該階段需求計算記憶體較大,且應用邏輯對記憶體需求有較大影響,下面舉例就行說明:

1、選擇合適的運算元,如 groupByKey 轉換為 reduceByKey

一般情況下,groupByKey 能實現的功能使用 reduceByKey 均可實現,而 ReduceByKey 存在 Map 端的合併,可以有效減少傳輸頻寬佔用及 Reduce 端記憶體消耗。

2、避免資料傾斜 (data skew)

Data Skew 是指任務間處理的資料量存大較大的差異。

如左圖所示,key 為 010 的資料較多,當發生 shuffle 時,010 所在分割槽存在大量資料,不僅拖慢 Job 執行(Job 的執行時間由最後完成的任務決定)。 而且導致 010 對應 Task 記憶體消耗過多,可能導致 OOM。

而右圖,經過預處理(加鹽,此處僅為舉例說明問題,解決方法不限於此)可以有效減少 Data Skew 導致的問題。

NOTE

上述舉例僅為說明調整應用邏輯可以在一定程式上解決OOM問題,解決方法不限於次。

4. Execution Memory Spill Heuristic

現象: 在 stage 3 發現執行記憶體溢位。Shuffle read bytes 和 spill 分佈均勻。這個 stage 有 200 個 tasks。

原因: 執行記憶體溢位,意味著執行記憶體不足。跟上面的 OOM 錯誤一樣,只是執行記憶體不足的情況下不會報 OOM 而是會將資料溢位到磁碟。但是整個效能很難接受。

解決方案: 同 3。

4. Executor GC Heuristic

現象: Executor 花費很多時間在 GC。

原因: 可以通過-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps檢視 GC 情況

解決方案:Garbage Collection Tuning

5. Beyond … memory, killed by yarn.

出現該問題原因是由於實際使用記憶體上限超過申請的記憶體上限而被 Yarn 終止掉了, 首先說明 Yarn 中 Container 的記憶體監控機制:

  • Container 程序的記憶體使用量 : 以 Container 程序為根的程序樹中所有程序的記憶體使用總量。
  • Container 被殺死的判斷依據 : 程序樹總記憶體(實體記憶體或虛擬記憶體)使用量超過向 Yarn 申請的記憶體上限值,則認為該 Container 使用記憶體超量,可以被“殺死”。

因此,對該異常的分析要從是否存在子程序兩個角度出發。

1、不存在子程序

根據 Container 程序殺死的條件可知,在不存在子程序時,出現 killed by yarn 問題是於由 Executor(JVM) 程序自身記憶體超過向 Yarn 申請的記憶體總量 M 所致。由於未出現上一節所述的 OOM 異常,因此可判定其為 M1 (Overhead) 不足,依據 Yarn 記憶體使用情況有如下兩種方案:

法一、如果,M (spark.executor.memory) 未達到 Yarn 單個 Container 允許的上限時,可僅增加 M1(spark.yarn.executor.memoryOverhead),從而增加 M;如果,M 達到 Yarn 單個 Container 允許的上限時,增加 M1,降低 M2。

注意二者之各要小於 Container 監控記憶體量,否則伸請資源將被 yarn 拒絕。

法二、減少可用的 Core 的數量 N,使並行任務數減少,從而減少 Overhead 開銷

2、存在子程序

Spark 應用中 Container 以 Executor(JVM程序)的形式存在,因此根程序為 Executor 對應的程序,而 Spark 應用向Yarn申請的總資源M = M1 + M2,都是以 Executor(JVM) 程序(非程序樹)可用資源的名義申請的。申請的資源並非一次性全量分配給 JVM 使用,而是先為 JVM 分配初始值,隨後記憶體不足時再按比率不斷進行擴容,直致達到 Container 監控的最大記憶體使用量 M。當 Executor 中啟動了子程序(如呼叫 shell 等)時,子程序佔用的記憶體(記為 S)就被加入 Container 程序樹,此時就會影響 Executor 實際可使用記憶體資源(Executor 程序實際可使用資源變為:M - S),然而啟動 JVM 時設定的可用最大資源為 M,且 JVM 程序並不會感知 Container 中留給自己的使用量已被子程序佔用,因此,當 JVM 使用量達到M - S,還會繼續開劈記憶體空間,這就會導致 Executor 程序樹使用的總記憶體量大於 M 而被 Yarn 殺死。

典形場景有:

  1. PySpark(Spark已做記憶體限制,一般不會佔用過大記憶體)
  2. 自定義Shell呼叫

其解決方案分別為:

1) PySpark場景:

  • 如果,M 未達到 Yarn 單個 Container 允許的上限時,可僅增加 M1 ,從而增加 M;如果,M 達到 Yarn 單個 Container 允許的上限時,增加 M1,降低 M2。
  • 減少可用的 Core 的數量 N,使並行任務數減少,從而減少 Overhead 開銷

2) 自定義 Shell 場景:(OverHead 不足為假象)

調整子程序可用記憶體量 (通過單機測試,記憶體控制在 Container 監控記憶體以內,且為 Spark 保留記憶體等留有空間)。

推薦閱讀

  1. Apache Spark 記憶體管理詳解
  2. Spark on Yarn之Executor記憶體管理
  3. Apache Spark 統一記憶體管理模型詳解
  4. Spark 記憶體管理之UnifiedMemoryManager
  5. Tuning Spark-Memory Management Overview