Spark核心_05
1.Spark核心
1.1核心元件
Driver在Spark作業執行時主要負責:
- 將使用者程式轉化為作業(Job);
- 在Executor之間排程任務(Task);
- 跟蹤Executor的執行情況;
- 通過UI展示查詢執行情況;
Executor物件是負責在Spark作業中執行具體任務
- 負責執行組成Spark應用的任務
- 要求快取的 RDD 提供記憶體式儲存
1.2Spark通用執行流程概述
- 任務提交後,都會先啟動Driver程式;
- 隨後Driver向叢集管理器註冊應用程式;
- 之後叢集管理器根據此任務的配置檔案分配Executor並啟動;
- Driver開始執行main函式,Spark查詢為懶執行,當執行到Action運算元時開始反向推算,根據寬依賴進行Stage的劃分,隨後每一個Stage對應一個Taskset,
Taskset中有多個Task,查詢可用資源Executor進行排程; - 根據本地化原則,Task會被分發到指定的Executor去執行,在任務執行的過程中,Executor也會不斷與Driver進行通訊,報告任務執行情況。
1.3Standalone模式
Standalone叢集有2個重要組成部分,分別是:
1) Master(RM):是一個程序,主要負責資源的排程和分配,並進行叢集的監控等職責; 2) Worker(NM):是一個程序,一個Worker執行在叢集中的一臺伺服器上,主要負責兩個職責,一個是用自己的記憶體儲存RDD的某個或某些partition; 另一個是啟動其他程序和執行緒(Executor),對RDD上的partition進行並行的處理和計算。
- 在Standalone Cluster模式下,任務提交後,Master會找到一個Worker啟動Driver。
- Driver啟動後向Master註冊應用程式,Master在Worker啟動Executor
- Worker上的Executor啟動後會向Driver反向註冊,
- 所有的Executor註冊完成後,Driver開始執行main函式,之後執行到Action運算元時,開始劃分Stage,每個Stage生成對應的taskSet,之後將Task分發到各個Executor上執行。
在Standalone Client模式下,Driver在任務提交的本地機器上執行
1.4YARN排程
資源排程和分配交給了YARN來處理
YARN-CLENT
YARN-CLUSTER
2.Spark通訊架構
相關知識
-
BIO(Blocking I/O):阻塞式IO
假設去飯店吃飯:老闆在給前面先來的人做飯,自己就找個位置坐下等著
-
NIO(New I/O):非阻塞式IO
老闆在給前面先來的人做飯,自己去幹別的事情,過一段時間來詢問老闆飯是否做好。幹別的事情不安寧,總要記著這個事情。
-
AIO(Asynchronous I/O):非同步非阻塞式IO
老闆在給前面先來的人做飯,和老闆約定好什麼時候給我飯,專心幹別的事情
Spark基於Netty通訊
Driver於Exceutor通訊的方式,發件箱和收件箱,發件箱與服務通訊
3. Spark任務排程機制
Driver執行緒主要是初始化SparkContext物件,準備執行所需的上下文
-
一方面保持與ApplicationMaster的RPC連線,通過ApplicationMaster申請資源,
-
另一方面排程任務,將任務下發到Executor上。
資源排程與任務分配
- 當ResourceManager向ApplicationMaster返回Container資源時
- ApplicationMaster就嘗試在對應的Container上啟動Executor程序,
- Executor程序起來後,會向Driver反向註冊,
- 註冊成功後保持與Driver的心跳,同時等待Driver分發任務,當分發的任務執行完畢後,將任務狀態上報給Driver。
3.1Spark任務排程概述
Job、Stage以及Task
- 遇到一個Action方法則觸發一個Job
- Stage以RDD寬依賴(即Shuffle)為界,遇到Shuffle做一次劃分
- 一個Stage對應一個TaskSet
Spark RDD通過其Transactions操作,形成了RDD血緣(依賴)關係圖,即DAG,最後通過Action的呼叫,觸發Job並排程執行,
執行過程中會建立兩個排程器:DAGScheduler和TaskScheduler。
- DAGScheduler負責Stage級的排程,主要是將job切分成若干Stages,並將每個Stage打包成TaskSet交給TaskScheduler排程。
- TaskScheduler負責Task級的排程,將DAGScheduler給過來的TaskSet,分發到Executor上執行
Driver初始化SparkContext過程中,會分別初始化DAGScheduler、TaskScheduler
3.2 Spark Stage級排程
當遇到一個Action操作後就會觸發一個Job的計算,並交給DAGScheduler來提交,根據DAG進行切分,將一個Job劃分為若干Stages
- 劃分的Stages分兩類,一類叫做ResultStage,為DAG最下游的Stage,由Action方法決定,
- 一類叫做ShuffleMapStage,為下游Stage準備資料
錯誤重試:
只有Executor丟失或者Task由於Fetch失敗才需要重新提交失敗的Stage以排程執行失敗的任務,其他型別的Task失敗會在TaskScheduler的排程過程中重試。
3.3Spark Task級排程
TaskScheduler會將TaskSet封裝為TaskSetManager
TaskSetManager負責監控管理同一個Stage中的Tasks,TaskScheduler就是以TaskSetManager為單元來排程任務。
3.3.1排程策略
一種是FIFO:將TaskSetManager按照先來先到的方式入隊,出隊時直接拿出最先進隊的TaskSetManager
一種是FAIR:TaskSetMagager進行排序,要排序的TaskSetMagager物件包含三個屬性: runningTasks值(正在執行的Task數)、minShare值、weight值,
綜合考量三值進行排序
TaskSetManager封裝了一個Stage的所有Task,並負責管理排程這些Task。
Spark排程總是會盡量讓每個task以最高的本地性級別來啟動
- 同一個Executor
- 同一個節點
- 同一個機架的兩個節點上
3.2 失敗重試
- Task被提交到Executor啟動執行後,
- Executor會將執行狀態上報給SchedulerBackend,
- SchedulerBackend則告訴TaskScheduler,TaskScheduler
- 到該Task對應的TaskSetManager,並通知到該TaskSetManager
TaskSetManager就知道Task的失敗與成功狀態,對於失敗的Task,會記錄它失敗的次數
在記錄Task失敗次數過程中,會記錄它上一次失敗所在的Executor Id和Host,這樣下次再排程這個Task時,會使用黑名單機制,避免它被排程到上一次失敗的節點上
4. Spark Shuffle解析
1.ShuffleMapStage的結束伴隨著shuffle檔案的寫磁碟。
2.ResultStage對應action運算元,即將一個函式應用在RDD的各個partition的資料集上,意味著一個job的執行結束
4.1HashShuffle
1.未優化的
Task 開始那邊各自進行 Hash 計算,每個task得到3個分類
2.優化的
啟用合併機制,合併機制就是複用buffer
在同一個程序中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer裡
然後把Buffer中的資料寫入以Core數量為單位的本地檔案中,(一個Core只有一種型別的Key的資料)
4.2SortShuffle
1.普通SortShuffle
- 在溢寫磁碟前,先根據key進行排序,排序過後的資料,會分批寫入到磁碟檔案中
- 也就是說一個Task過程會產生多個臨時檔案
- 最後在每個Task中,將所有的臨時檔案合併,這就是merge過程
- 此過程將所有臨時檔案讀取出來,一次寫入到最終檔案
意味著一個Task的所有資料都在這一個檔案中。同時單獨寫一份索引檔案,標識下游各個Task的資料在檔案中的索引,start offset和end offset。
2.bypass SortShuffle
觸發條件
- shuffle reduce task數量小於觸發引數的閾值
- 不是聚合類的shuffle運算元
此時task會為每個reduce端的task都建立一個臨時磁碟檔案
該過程的磁碟寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要建立數量驚人的磁碟檔案,只是在最後會做一個磁碟檔案的合併而已
而該機制與普通SortShuffleManager執行機制的不同在於:不會進行排序,shuffle write過程中,不需要進行資料的排序操作,也就節省掉了這部分的效能開銷
5.Spark記憶體管理
5.1堆內記憶體和堆外記憶體
堆內記憶體受到JVM統一管理,
堆外記憶體是直接向作業系統進行記憶體的申請和釋放。
Spark對堆內記憶體的規劃
-
這些任務在快取 RDD 資料和廣播(Broadcast)資料時佔用的記憶體被規劃為儲存(Storage)記憶體,
-
這些任務在執行 Shuffle 時佔用的記憶體被規劃為執行(Execution)記憶體
-
剩餘的部分不做特殊規劃
Spark對堆內記憶體的管理是一種邏輯上的”規劃式”的管理,物件例項佔用記憶體的申請和釋放都由JVM完成,
Spark只能在申請後和釋放前記錄這些記憶體。
申請記憶體流程如下:
-
Spark 在程式碼中 new 一個物件例項;
-
JVM 從堆內記憶體分配空間,建立物件並返回物件引用;
-
Spark 儲存該物件的引用,記錄該物件佔用的記憶體。
釋放記憶體流程如下:
-
Spark記錄該物件釋放的記憶體,刪除該物件的引用;
-
等待JVM的垃圾回收機制釋放該物件佔用的堆內記憶體。
Spark 通過對儲存記憶體和執行記憶體各自獨立的規劃管理,可以決定是否要在儲存記憶體裡快取新的 RDD,以及是否為新的任務分配執行記憶體,在一定程度上
可以提升記憶體的利用率,減少異常的出現。
堆外記憶體是直接向作業系統進行記憶體的申請和釋放。
為了進一步優化記憶體的使用以及提高Shuffle時排序的效率,Spark引入了堆外(Off-heap)記憶體,使之可以直接在工作節點的系統記憶體中開闢空間
5.2統一記憶體管理
儲存記憶體和執行記憶體共享同一塊空間,可以動態佔用對方的空閒區域
雙方的空間都不足時,則儲存到硬碟;若己方空間不足而對方空餘時,可借用對方的空間
Task在啟動之初讀取一個分割槽時,會先判斷這個分割槽是否已經被持久化,如果沒有則需要檢查Checkpoint 或按照血統重新計算
Driver端的Master負責整個Spark應用程式的Block的元資料資訊的管理和維護,而Executor端的Slave需要將Block的更新等狀態上報到Master,
同時接收Master 的命令,例如新增或刪除一個RDD。
RDD 在快取到儲存記憶體之後,Partition 被轉換成Block,Record在堆內或堆外儲存記憶體中佔用一塊連續的空間。將Partition由不連續的儲存空間
轉換為連續儲存空間的過程,Spark稱之為"展開"(Unroll)。
Spark的儲存記憶體和執行記憶體有著截然不同的管理方式:
-
對於儲存記憶體來說,Spark用一個LinkedHashMap來集中管理所有的Block,Block由需要快取的 RDD的Partition轉化而成;
-
而對於執行記憶體,Spark用AppendOnlyMap來儲存 Shuffle過程中的資料,在Tungsten排序中甚至抽象成為頁式記憶體管理,開闢了全新的
JVM記憶體管理機制。
2021.11.27 15:43