Spark中的記憶體管理(一)
一個Spark應用執行的過程如下所示:
- Driver
使用者的主程式提交到Driver中執行,在Driver中建立SparkContext,SparkContext初始化DAGScheduler和TaskScheduler,作為coordinator負責從AppMaster申請資源,並將作業的Task排程到Executor上面執行。
在yarn-cluster模式下,AppMaster中包含了Driver,在YARN中啟動,spark-submit客戶端kill掉不影響程式的執行;
在yarn-client模式下,Driver在spark-submit的客戶端啟動(不在YARN中),跟AppMaster是分離的,spark-submit客戶端kill掉會導致Spark程式掛掉(如spark-sql/spark-shell等都是以yarn-client的方式提交)
Executor上面執行的每個MapTask結束後都會有MapStatus彙報給Driver, 當MapTask數量非常多的時候可能會導致Driver出現OOM,此時需要調整Driver的記憶體大小,通過--conf spark.driver.memory=4G
或者--driver-memory 4G
來進行設定。
- Executor
實際執行Task的節點,Executor的個數由--conf spark.executor.instances=4
或者--num-executors 4
來設定;每個Executor裡面併發跑的Task個數由--conf spark.executor.cores=2
--executor-cores
指定。
Executor的記憶體由--conf spark.executor.memory=4G
或者--executor-memory 4G
設定。
Spark記憶體管理
上面介紹了Spark中兩個角色(Driver/Executor),其中Executor是實際執行Task的節點,Spark記憶體管理主要在Executor上面。
Executor記憶體使用結構
如上圖所示, Spark on YARN模式下一個Executor的記憶體使用情況:
整個Executor是YARN的一個container,所以它的總記憶體受yarn.scheduler.maximum-allocation-mb
當用戶提交作業的時候通過spark.executor.memory
引數設定了executor的堆記憶體(heapsize),這部分記憶體的使用情況如上圖所示:
- 系統預留(固定300MB)
詳見SPARK-12081 - spark.memory.fraction
該引數控制executor內使用者計算(execution)和儲存(storage)總佔用多少記憶體,即(M-R)*spark.memory.fraction
大小的記憶體; 剩餘的(M-R)*(1-spark.memory.fraction)用於Spark內部的metadata以及使用者資料結構等使用
對於spark.executor.memroyOverhead
,它是executor可額外使用的堆外(off-heap)記憶體,比如spark的shuffle過程使用的netty就會使用到堆外記憶體,如果程式有遇到相關的oom錯誤,可以嘗試調大該引數。該記憶體不屬於上面spark.executor.memory
(on-heap),但是它們的總和不能超過yarn.scheduler.maximum-allocation-mb
.
execution/storage記憶體管理
上圖中execution/storage的記憶體((M-R)*spark.memroy.fraction
)是Task在executor中執行需要用到的記憶體,它們通過UnifiedMemoryManager
這個統一記憶體管理器來管理。
UnifiedMemoryManager
中的execution和storage的管理沒有硬性的邊界控制(比如execution固定佔比多少),它們之間是一個軟邊界,初始的邊界由spark.memory.storageFraction
來設定(預設0.5),但這個並不是一個固定的邊界:
a) 當execution不夠的時候,可以從storage側借記憶體,如storage基本沒使用(如沒有cache資料等),execution可以從storage借記憶體甚至全部都借完,即使後續有storage需要用記憶體也不能強制從execution拿回,除非execution後續自己釋放了部分記憶體,storage才能拿來使用;
b) 當storage不夠的時候,如果execution有空閒多餘的記憶體,則也可以借,但是後續如果execution又需要更多記憶體了則可以強制從storage拿回記憶體(如可以將storge的資料寫到磁碟,然後釋放對應的記憶體),直到storage使用的記憶體減少到spark.memory.storageFraction
的比例。
Task記憶體管理
一個Executor可以同時併發執行多個Task(通過spark.executor.cores
控制),而每個Task在執行的過程中都需要從Executor申請記憶體來使用,那Executor如何將記憶體分配給併發執行的多個Task呢? 這塊留到下一篇文章來介紹。