1. 程式人生 > >Spark Shuffle 中 JVM 內存使用及配置內幕詳情

Spark Shuffle 中 JVM 內存使用及配置內幕詳情

數據緩存 案例 part png 配置 4條 resources CP 考題

本課主題

  • JVM 內存使用架構剖析
  • Spark 1.6.x 和 Spark 2.x 的 JVM 剖析
  • Spark 1.6.x 以前 on Yarn 計算內存使用案例
  • Spark Unified Memory 的運行原理和機制

引言

Spark 從1.6.x 開始對 JVM 的內存使用作出了一種全新的改變,Spark 1.6.x 以前是基於靜態固定的JVM內存使用架構和運行機制,如果你不知道 Spark 到底對 JVM 是怎麽使用,你怎麽可以很有信心地或者是完全確定地掌握和控制數據的緩存空間呢,所以掌握Spark對JVM的內存使用內幕是至關重要的。很多人對 Spark 的印象是:它是基於內存的,而且可以緩存一大堆數據

,顯現 Spark 是基於內存的觀點是錯的,Spark 只是優先充分地利用內存而已。如果你不知道 Spark 可以緩存多少數據,你就誤亂地緩存數據的話,肯定會有問題。

在數據規模已經確定的情況下,你有多少 Executor 和每個 Executor 可分配多少內存 (在這個物理硬件已經確定的情況下),你必須清楚知道你的內存最多能夠緩存多少數據;在 Shuffle 的過程中又使用了多少比例的緩存,這樣對於算法的編寫以及業務實現是至關重要的!!!

文章的後部份會介紹 Spark 2.x 版本 JVM 的內存使用比例,它被稱之為 Spark Unified Memory,這是統一或者聯合的意思,但是 Spark 沒有用 Shared 這個字,因為 A 和 B 進行 Unified 和 A 和 B 進行 Shared 其實是兩個不同的概念, Spark 在運行的時候會有不同類型的 OOM,你必須搞清楚這個 OOM 背後是由什麽導致的。 比如說我們使用算子 mapPartition 的時候,一般會創建一些臨時對象或者是中間數據,你這個時候使用的臨時對象和中間數據,是存儲在一個叫 UserSpace 裏面的用戶操作空間,那你有沒有想過這個空間的大小會導致應用程序出現 OOM 的情況,在 Spark 2.x 中 Broadcast 的數據是存儲在什麽地方;ShuffleMapTask 的數據又存儲在什麽地方,可能你會認為 ShuffleMapTask 的數據是緩存在 Cache 中。這篇文章會介紹 JVM 在 Spark 1.6.X 以前和 2.X 版本對 Java 堆的使用,還會逐一解密上述幾個疑問,也會簡單介紹 Spark 1.6.x 以前版本在 Spark On Yarn 上內存的使用案例,希望這篇文章能為讀者帶出以下的啟發:

  • 了解 JVM 內存使用架構剖析
  • 了解 JVM 在 Spark 1.6.x 以前和 Spark 2.x 中可以緩存多少數據
  • 了解 Spark Unified Memory 的原理與機制還有它三大核心空間的用途
  • 了解 Shuffle 在 Spark 1.6.x 以前和 Spark 2.x 中可以使用多少緩存
  • 了解 Spark1.6.x 以前 on Yarn 對內存的使用
  • 了解 在 Spark 1.6.x 以前和 Spark 2.x Shuffle 的參數配置

JVM 內存使用架構剖析

JVM 有很多不同的區,最開始的時候,它會通過類裝載器把類加載進來,在運行期數據區中有 "本地方法棧","程序計數器","Java 棧"、"Java 堆"和"方法區"

以及本地方法接口和它的本地庫。從 Spark 的角度來談代碼的運行和數據的處理,主要是談 Java 堆 (Heap) 空間的運用。

[下圖是JVM 內存架構圖]
技術分享圖片

  • 本地方法棧:這個是在叠歸的時候肯定是至關重要的;
  • 程序計數器:這是一個全區計數器,對於線程切換是至關重要的;
  • Java 棧 (Stack)Stack 區屬於線程私有,高效的程序一般都是並發的,每個線程都會包含一個 Stack 區域,Stack 區域中含有基本的數據類型以及對象的引用,其它線程均不能直接訪問該區域;Java 棧分為三大部份:基本數據類型區域、操作指令區域、上下文等;
  • Java 堆 (Heap):存儲的全部都是 Object 對象實例,對象實例中一般都包含了其數據成員以及與該對象對應類的信息,它會指向類的引用一個,不同線程肯定要操作這個對象;一個 JVM 實例在運行的時候只有一個 Heap 區域,而且該區域被所有的線程共享;補充說明:垃圾回收是回收堆 (heap) 中內容,堆上才有我們的對象
  • 方法區:又名靜態成員區域,包含整個程序的 class、static 成員等,類本身的字節碼是靜態的;它會被所有的線程共享和是全區級別的

Spark 1.6.x 和 2.x 的 JVM 剖析

Spark JVM 到底可以緩存多少數據

下圖顯示的是Spark 1.6.x 以前版本對 Java 堆 (heap) 的使用情況,左則是 Storage 對內存的使用,右則是 Shuffle 對內存的使用,這叫 StaticMemoryManagement,數據處理以及類的實體對象都存放在 JVM 堆 (heap) 中。

[下圖是 Spark 1.6x 以前版本對 JVM 堆 Storage 和 Shuffle 的使用分布]
技術分享圖片

Spark 1.6.x 版本對 JVM 堆的使用

JVM Heap 默認情況下是 512MB,這是取決於 spark.executor.memory 的參數,在回答 Spark JVM 到底可以緩存多少數據這個問題之前,首先了解一下 JVM Heap 在 Spark 中是如何分配內存比例的。無論你定義了 spark.executor.memory 的內存空間有多大,Spark 必然會定義一個安全空間,在默認情況下只會使用 Java 堆上的 90% 作為安全空間,在單個 Executor 的角度來講,就是 Heap Size x 90%。

埸景一:假設說在一個Executor,它可用的 Java Heap 大小是 10G,實際上 Spark 只能使用 90%,這個安全空間的比例是由spark.storage.safetyFaction 來控制的。(如果你內存的 Heap 非常大的話,可以嘗試調高為 95%),在安全空間中也會劃分三個不同的空間:一個是 Storage 空間、一個是 Unroll 空間和一個是 Shuffle 空間。

  • 安全空間 (safe):計算公式是 spark.executor.memory x spark.storage.safetyFraction。也就是 Heap Size x 90%,在埸景一的例子中是 10 x 0.9 = 9G;
  • 緩存空間 (Storage):計算公式是 spark.executor.memory x spark.storage.safetyFraction x spark.storage.memoryFraction。也就是 Heap Size x 90% x 60%;Heap Size x 54%,在埸景一的例子中是 10 x 0.9 x 0.6 = 5.4G;一個應用程序可以緩存多少數據是由 safetyFraction 和 memoryFraction 這兩個參數共同決定的。
    [下圖是 StaticMemoryManager.scala 中的 getMaxStorageMemory 方法]
    技術分享圖片
  • Unroll 空間
    • 計算公式是 spark.executor.memory x spark.storage.safetyFraction x spark.storage.memoryFraction x spark.storage.unrollFraction
      也就是 Heap Size x 90% x 60% x 20%;Heap Size x 10.8%,在埸景一的例子中是 10 x 0.9 x 0.6 x 0.2 = 1.8G,你可能把序例化後的數據放在內存中,當你使用數據時,你需要把序例化的數據進行反序例化。
      [下圖是 StaticMemoryManager.scala 中的 maxUnrollMemory 變量]
      技術分享圖片
    • 對 cache 緩存數據的影響是由於 Unroll 是一個優先級較高的操作,進行 Unroll 操作的時候會占用 cache 的空間,而且又可以擠掉緩存在內存中的數據 (如果該數據的緩存級別是 MEMORY_ONLY 的話,否則該數據會丟失)。
  • Shuffle 空間:計算公式是 spark.executor.memory x spark.shuffle.memoryFraction x spark.shuffle.safteyFraction。在 Shuffle 空間中也會有一個默認 80% 的安全空間比例,所以應該是 Heap Size x 20% x 80%;Heap Size x 16%,在埸景一的例子中是 10 x 0.2 x 0.8 = 1.6G;從內存的角度講,你需要從遠程抓取數據,抓取數據是一個 Shuffle 的過程,比如說你需要對數據進行排序,顯現在這個過程中需要內存空間。
    [下圖是 StaticMemoryManager.scala 中的 getMaxExecutionMemory 方法]
    技術分享圖片

Spark Unified Memory 原理和運行機制

下圖是一種叫聯合內存 (Spark Unified Memeory),數據緩存與數據執行之間的內存可以相互移動,這是一種更彈性的方式,下圖顯示的是 Spark 2.x 版本對 Java 堆 (heap) 的使用情況,數據處理以及類的實體對象存放在 JVM 堆 (heap) 中。

[下圖是 Spark 2.x 版本對 JVM 堆 Storage 和 Execution 的使用分布]

技術分享圖片

Spark 2.x 版本對 JVM 堆的使用

Spark 2.1.0 新型 JVM Heap 分成三個部份:Reserved Memory、User Memory 和 Spark Memory。

  • Reserved Memory:默認都是300MB,這個數字一般都是固定不變的,在系統運行的時候 Java Heap 的大小至少為 Heap Reserved Memory x 1.5. e.g. 300MB x 1.5 = 450MB 的 JVM配置。一般本地開發例如說在 Windows 系統上,建義系統至少 2G 的大小。
    [下圖是 UnifiedMemoryManager.scala 中 UnifiedMemoryManager 伴生對象裏的 RESERVED_SYSTEM_MEMORY_BYTES 參數]
    技術分享圖片
    SparkMemory空間默認是占可用 HeapSize 的 60%,與上圖顯示的75%有點不同,當然這個參數是可配置的!!
    [下圖是 UnifiedMemoryManager.scala 中 UnifiedMemoryManager 伴生對象裏的 getMaxMemory 方法]
    技術分享圖片
  • User Memory:寫 Spark 程序中產生的臨時數據或者是自己維護的一些數據結構也需要給予它一部份的存儲空間,你可以這麽認為,這是程序運行時用戶可以主導的空間,叫用戶操作空間。它占用的空間是 (Java Heap - Reserved Memory) x 25%(默認是25%,可以有參數供調優),這樣設計可以讓用戶操作時所需要的空間與系統框架運行時所需要的空間分離開。假設 Executor 有 4G 的大小,那麽在默認情況下 User Memory 大小是:(4G - 300MB) x 25% = 949MB,也就是說一個 Stage 內部展開後 Task 的算子在運行時最大的大小不能夠超過 949MB。例如工程師使用 mapPartition 等,一個 Task 內部所有算子使用的數據空間的大小如果大於 949MB 的話,那麽就會出現 OOM。思考題:有 100個 Executors 每個 4G 大小,現在要處理 100G 的數據,假設這 100G 分配給 100個 Executors,每個 Executor 分配 1G 的數據,這 1G 的數據遠遠少於 4G Executor 內存的大小,為什麽還會出現 OOM 的情況呢?那是因為在你的代碼中(e.g.你寫的應用程序算子)超過用戶空間的限制 (e.g. 949MB),而不是 RDD 本身的數據超過了限制。
  • Spark Memeory:系統框架運行時需要使用的空間,這是從兩部份構成的,分別是 Storage Memeory 和 Execution Memory。現在 Storage 和 Execution (Shuffle) 采用了 Unified 的方式共同使用了 (Heap Size - 300MB) x 75%,默認情況下 Storage 和 Execution 各占該空間的 50%。你可以從圖中可以看出,Storgae 和 Execution 的存儲空間可以往上和往下移動。
    定義:所謂 Unified 的意思是 Storgae 和 Execution 在適當時候可以借用彼此的 Memory,需要註意的是,當 Execution 空間不足而且 Storage 空間也不足的情況下,Storage 空間如果曾經使用了超過 Unified 默認的 50% 空間的話則超過部份會被強制 drop 掉一部份數據來解決 Execution 空間不足的問題 (註意:drop 後數據會不會丟失主要是看你在程序設置的 storage_level 來決定你是 Drop 到那裏,可能 Drop 到磁盤上),這是因為執行(Execution) 比緩存 (Storage) 是更重要的事情。
    [下圖是 UnifiedMemoryManager.scala 中 UnifiedMemoryManager 伴生對象裏的 apply 方法]
    技術分享圖片
    但是也有它的基本條件限制,Execution 向 Storage 借空間有兩種情況:具體代碼實現可以參考源碼補充 : Spark 2.1.X 中 Unified 和 Static MemoryManager
    [下圖是 Execution 向 Storage 借空間的第一種情況]
    技術分享圖片
    第一種情況:Storage 曾經向 Execution 借了空間,它緩存的數據可能是非常的多,然後 Execution 又不需要那麽大的空間 (默認情況下各占 50%),假設現在 Storage 占了 80%,Execution 占了 20%,然後 Execution 說自己空間不足,Execution 會向內存管理器發信號把 Storgae 曾經占用的超過 50%數據的那部份強制擠掉,在這個例子中擠掉了 30%
    [下圖是 Execution 向 Storage 借空間的第二種情況]
    技術分享圖片
    第二種情況:Execution 可以向 Storage Memory 借空間,在 Storage Memory 不足 50% 的情況下,Storgae Memory 會很樂意地把剩餘空間借給 Execution。相反當 Execution 有剩餘空間的時候,Storgae 也可以找 Execution 借空間。
    • Storage Memeory:相當於舊版本的 Storage 空間,在舊版本中 Storage 占了 54% 的 Heap 空間,這個空間會負責存儲 Persist、Unroll 以及 Broadcast 的數據。假設 Executor 有 4G 的大小,那麽 Storage 空間是:(4G - 300MB) x 75% x 50% = 1423.5MB 的空間,也就是說如果你的內存夠大的話,你可以擴播足夠大的變量,擴播對於性能提升是一件很重要的事情,因為它所有的線程都是共享的。從算子運行的角度來講,Spark 會傾向於數據直接從 Storgae Memeory 中抓取過來,這也就所謂的內存計算。
    • Execution Memeory:相當於舊版本的 Shuffle 空間,這個空間會負責存儲 ShuffleMapTask 的數據。比如說從上一個 Stage 抓取數據和一些聚合的操作、等等在舊版本中 Shuffle 占了 16% 的 Heap 空間。Execution 如果空間不足的情況下,除了選擇向 Storage Memory 借空間以外,也可以把一部份數據 Spill 到磁盤上,但很多時候基於性能調優方面的考慮都不想把數據 Spill 到磁盤上。思考題:你覺得是 Storgae 空間或者是 Execution 空間比較重要呢?

Spark1.6.x 以前 on Yarn 計算內存使用案例

這是一張 Spark 運行在 Yarn 上的架構圖,它有 Driver 和 Executor 部份,在 Driver 部份有一個內存控制參數,Spark 1.6.x 以前是spark.driver.memory,在實際生產環境下建義配置成 2G。如果 Driver 比較繁忙或者是經常把某些數據收集到 Driver 上的話,建義把這個參數調大一點。

圖的左邊是 Executor 部份,它是被 Yarn 管理的,每臺機制上都有一個 Node Manager;Node Manager 是被 Resources Manager 管理的,Resources Manager 的工作主要是管理全區級別的計算資源,計算資源核心就是內存和 CPU,每臺機器上都有一個 Node Manager 來管理當前內存和 CPU 等資源。Yarn 一般跟 Hadoop 藕合,它底層會有 HDFS Node Manager,主要是負責管理當前機器進程上的數據並且與HDFS Name Node 進行通信。

[下圖是 Spark on Yarn 的架構圖]
技術分享圖片

在每個節點上至少有兩個進程,一個是 HDFS Data Node,負責管理磁盤上的數據,另外一個是 Yarn Node Manager,負責管理執行進程,在這兩個 Node 的下面有兩個 Executors,每個 Executor 裏面運行的都是 Tasks。從 Yarn 的角度來講,會配置每個 Executor 所占用的空間,以防止資源競爭,Yarn 裏有一個叫 Node Memory Pool 的概念,可以配置 64G 或者是 128G,Node Memory Pool 是當前節點上總共能夠使用的內存大小。

圖中這兩個 Executors 在兩個不同的進程中 (JVM#1 和 JVM#2),裏面的 Task 是並行運行的,Task 是運行在線程中,但你可以配置 Task 使用線程的數量,e.g. 2條線程或者是4條線程,但默認情況下都是1條線程去處理一個Task,你也可以用 spark.executor.cores 去配置可用的 Core 以及 spark.executor.memory 去配置可用的 RAM 的大小。

在 Yarn 上啟動 Spark Application 的時候可以通過以下參數來調優:

  • -num-executor 或者 spark.executor.instances 來指定運行時所需要的 Executor 的個數;
  • -executor-memory 或者 spark.executor.memory 來指定每個 Executor 在運行時所需要的內存空間;
  • -executor-cores 或者是 spark.executor.cores 來指定每個 Executor 在運行時所需要的 Cores 的個數;
  • -driver-memory 或者是 spark.driver.memory 來指定 Driver 內存的大小;
  • spark.task.cpus 來指定每個 Task 運行時所需要的 Cores 的個數;

場景一:例如 Yarn 集群上有 32 個 Node 來運行的 Node Manager,每個 Node 的內存是 64G,每個 Node 的 Cores 是 32 Cores,假如說每個 Node 我們要分配兩個 Executors,那麽可以把每個 Executor 分配 28G,Cores 分配為 12 個 Cores,每個 Spark Task 在運行的時候只需要一個 Core 就行啦,那麽我們 32 個 Nodes 同時可以運行: 32 個 Node x 2 個 Executors x (12 個 Cores / 1) = 768 個 Task Slots,也就是說這個集群可以並行運行 768 個 Task,如果 Job 超過了 Task 可以並行運行的數量 (e.g. 768) 則需要排隊。那麽這個集群模可以緩存多少數據呢?從理論上:32 個 Node x 2 個 Executors x 28g x 90% 安全空間 x 60%緩存空間 = 967.68G,這個緩存數量對於普通的 Spark Job 而言是完全夠用的,而實際上在運行中可能只能緩存 900G 的數據,900G 的數據從磁盤儲存的角度數據有多大呢?還是 900G 嗎?不是的,數據一般都會膨脹好幾倍,這是和壓縮、序列化和反序列化框架有關,所以在磁盤上可能也就 300G 的樣子的數據。

總結

了解 Spark Shuffle 中的 JVM 內存使用空間對一個Spark應用程序的內存調優是至關重要的。跟據不同的內存控制原理分別對存儲和執行空間進行參數調優:spark.executor.memory, spark.storage.safetyFraction, spark.storage.memoryFraction, spark.storage.unrollFraction, spark.shuffle.memoryFraction, spark.shuffle.safteyFraction。

Spark 1.6 以前的版本是使用固定的內存分配策略,把 JVM Heap 中的 90% 分配為安全空間,然後從這90%的安全空間中的 60% 作為存儲空間,例如進行 Persist、Unroll 以及 Broadcast 的數據。然後再把這60%的20%作為支持一些序列化和反序列化的數據工作。其次當程序運行時,JVM Heap 會把其中的 80% 作為運行過程中的安全空間,這80%的其中20%是用來負責 Shuffle 數據傳輸的空間。

Spark 2.0 中推出了聯合內存的概念,最主要的改變是存儲和運行的空間可以動態移動。需要註意的是執行比存儲有更大的優先值,當空間不夠時,可以向對方借空間,但前提是對方有足夠的空間或者是 Execution 可以強制把 Storage 一部份空間擠掉。Excution 向 Storage 借空間有兩種方式:第一種方式是 Storage 曾經向 Execution 借了空間,它緩存的數據可能是非常的多,當 Execution 需要空間時可以強制拿回來;第二種方式是 Storage Memory 不足 50% 的情況下,Storgae Memory 會很樂意地把剩餘空間借給 Execution。

如果是你的計算比較復雜的情況,使用新型的內存管理 (Unified Memory Management) 會取得更好的效率,但是如果說計算的業務邏輯需要更大的緩存空間,此時使用老版本的固定內存管理 (StaticMemoryManagement) 效果會更好。

Spark Shuffle 中 JVM 內存使用及配置內幕詳情