1. 程式人生 > 實用技巧 >Spark開發-Spark記憶體溢位原因以及解決方式

Spark開發-Spark記憶體溢位原因以及解決方式

報錯情況

Container killed by YARN for exceeding memory limits. 1*.4 GB of 1* GB physical memory used. 
 Consider boosting spark.yarn.executor.memoryOverhead.

基本內容介紹:

1.executor 和 container
  01.Spark中的 executor 程序是跑在 container 中,所以container的最大記憶體會直接影響到executor的最大可用記憶體
  02. yarn.nodemanager.pmem-check-enabled 該引數預設是true,也就是會由它來控制監控container的記憶體使用
  03. yarn.scheduler.maximum-allocation-mb 設定值6114,也就是說單個container申請的最大記憶體是這麼多,
	   執行任務的時候你的executer需要的記憶體超出這個值,那麼就會被殺掉
	    container超過了記憶體的限制從而被kill掉
   04.executor執行的時候,用的記憶體可能會超過 executor-memory
        所以會為executor額外預留一部分記憶體。spark.yarn.executor.memoryOverhead 代表了這部分記憶體
		即實際的記憶體
		  val executorMem = args.executorMemory + executorMemoryOverhead
	05.memoryOverhead
    如果沒有設定 spark.yarn.executor.memoryOverhead ,則這部分的記憶體大小為
        math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
	   其中 MEMORY_OVERHEAD_FACTOR 預設為0.1, MEMORY_OVERHEAD_MIN 預設為384m executorMemory 為設定的 executor-memory
	    實際 executorMem= X+max(X*0.1,384)
		設定了的話 
		   executorMem=X +spark.yarn.executor.memoryOverhead  其中 X 是值 args.executorMemory
	06. executorMem 需要滿足的條件: executorMem< yarn.scheduler.maximum-allocation-mb 	

2.Yarn 中 contaimer 和 Spark中 partition 之間的關係   
   job會被切分成stages,每個stage切分成task,每個task單獨排程,可以把executor的jvm程序看做task執行池
   spark.executor.memory  每個executor使用的記憶體
    一個executor可以並行執行多個task,實際上一個executor是一個程序,task是executor裡的一個執行緒。
    一個task至少要獨佔executor裡的一個虛擬核心vcore, 一個task要佔用幾個核心,可以由.config("spark.task.cpus", 1)配置,預設是1即一個task佔用一個vcore
    同時並行執行的task最大數量 = executor數目 * (每個executor核數 / 每個task佔用核心數)
	總核數= executor-cores * num-executor
	 例如: 每個 executor具有3個 cores 理論上每個executor可以處理1-4個task
3.分割槽與Task的情況
     讀取階段
         01.從記憶體中建立 RDD:sc.parallelize(...),那麼預設的分割槽數量為該程式所分配的資源的 CPU 數量。
         02.如果是讀取hdfs的檔案,
             一般來說,partition的數量等於檔案的數量。
             如果單個檔案的大小大於hdfs的分塊大小,partition的數量就等於 “檔案大小/分塊大小”。
             同時,也可以使用rdd的repartition方法重新劃分partition。
     運算階段
        經過不同的運算元計算後,分割槽數目又會變化
        Task 的數量是由 Partition 決定的
	在Spark中有兩類task,一類是shuffleMapTask,一類是resultTask,
	     第一類task的輸出是shuffle所需資料,
		 第二類task的輸出是result,

可能的原因:

1、資料出現了傾斜等原因導致其中一個 contaimer 記憶體負載太大 執行失敗
2.	Spark的shuffle部分使用了netty框架進行網路傳輸,但netty會申請堆外記憶體快取 Shuffle時,
    每個Reduce都需要獲取每個map對應的輸出,
    當一個reduce需要獲取的一個map資料比較大 超出配置的限制就報了這個錯。
	  通過spark.sql.adaptive.shuffle.targetPostShuffleInputSize 可設定每個 Reducer 讀取的目標資料量,其單位是位元組,預設值為 64 MB。

解決記憶體overhead的問題的方法是:

 1.將"spark.executor.memory" 從8g設定為12g。將記憶體調大
 2.將"spark.executor.cores"  從8設定為4。   將core的個數調小。
 3.將rdd/dateframe進行重新分割槽 。           重新分割槽(repartition)
 4.將"spark.yarn.executor.memoryOverhead"設定為最大值,可以考慮一下4096。這個數值一般都是2的次冪。

具體引數配置

set spark.sql.adaptive.repartition.enabled=true;
set spark.sql.shuffle.partitions=2000;
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=67108864;

專有名詞解釋

1.常用配置
   配置任務可用executor數量
   每個Executor佔用記憶體
   每個Executor的core數目  spark.executor.cores
  
  The maximum memory size of container to running driver 
    is determined  by 
  the sum of 
      spark.driver.memoryOverhead 
      spark.driver.memory.

  The maximum memory size of container to running executor
   is determined by 
  the sum of 
      spark.executor.memory, 
      spark.executor.memoryOverhead, 
      spark.memory.offHeap.size 
	  spark.executor.pyspark.memory.
 Shuffle Behavior
 Memory Management
    spark.memory.fraction
	 在Spark中,執行和儲存共享一個統一的區域M
	   代表整體JVM堆記憶體中M的百分比(預設0.6)。
	    剩餘的空間(40%)是為使用者資料結構、Spark內部metadata預留的,並在稀疏使用和異常大記錄的情況下避免OOM錯誤
	spark.memory.storageFraction

Note: Non-heap memory includes off-heap memory (when spark.memory.offHeap.enabled=true)
   and memory used by other driver processes (e.g. python process that goes with a PySpark driver) 
   and memory used by other non-driver processes running in the same container

spark.executor.memoryOverhead
    This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.

spark.memory.offHeap.size
spark.memory.offHeap.enabled

原始碼

package org.apache.spark.deploy.yarn
    DRIVER_MEMORY_OVERHEAD
	EXECUTOR_MEMORY : Amount of memory to use per executor process
    EXECUTOR_MEMORY_OVERHEAD: The amount of off-heap memory to be allocated per executor in cluster mode
	EXECUTOR_CORES = ConfigBuilder("spark.executor.cores")
	EXECUTOR_MEMORY_OVERHEAD = ConfigBuilder("spark.yarn.executor.memoryOverhead")
     // Executor memory in MB.
      protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
      // Additional memory overhead.
      protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
        math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt

	// Resource capability requested for each executors
     private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)

package org.apache.spark.memory;
    public enum MemoryMode { ON_HEAP, OFF_HEAP}
	private[spark] abstract class MemoryManager(
      conf: SparkConf,
      numCores: Int,
      onHeapStorageMemory: Long,
      onHeapExecutionMemory: Long) extends Logging {
     # Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using sun.misc.Unsafe.
       final val tungstenMemoryMode: MemoryMode = {
         if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
           require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
             "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
           require(Platform.unaligned(),
             "No support for unaligned Unsafe. Set spark.memory.offHeap.enabled to false.")
           MemoryMode.OFF_HEAP
         } else {
           MemoryMode.ON_HEAP
         }
       }

參考:

  https://spark.apache.org/docs/latest/configuration.html
  https://spark.apache.org/docs/latest/running-on-yarn.html#configuration