1. 程式人生 > 其它 >Spark核心_05

Spark核心_05

通訊架構,任務排程機制,Shuffle解析,記憶體管理 目錄

1.Spark核心

1.1核心元件

Driver在Spark作業執行時主要負責:

  1. 將使用者程式轉化為作業(Job);
  2. 在Executor之間排程任務(Task);
  3. 跟蹤Executor的執行情況;
  4. 通過UI展示查詢執行情況;

Executor物件是負責在Spark作業中執行具體任務

  1. 負責執行組成Spark應用的任務
  2. 要求快取的 RDD 提供記憶體式儲存

1.2Spark通用執行流程概述

  1. 任務提交後,都會先啟動Driver程式;
  2. 隨後Driver向叢集管理器註冊應用程式;
  3. 之後叢集管理器根據此任務的配置檔案分配Executor並啟動;
  4. Driver開始執行main函式,Spark查詢為懶執行,當執行到Action運算元時開始反向推算,根據寬依賴進行Stage的劃分,隨後每一個Stage對應一個Taskset,
    Taskset中有多個Task,查詢可用資源Executor進行排程;
  5. 根據本地化原則,Task會被分發到指定的Executor去執行,在任務執行的過程中,Executor也會不斷與Driver進行通訊,報告任務執行情況。

1.3Standalone模式

Standalone叢集有2個重要組成部分,分別是:

1) Master(RM):是一個程序,主要負責資源的排程和分配,並進行叢集的監控等職責;

2) Worker(NM):是一個程序,一個Worker執行在叢集中的一臺伺服器上,主要負責兩個職責,一個是用自己的記憶體儲存RDD的某個或某些partition;
另一個是啟動其他程序和執行緒(Executor),對RDD上的partition進行並行的處理和計算。
  1. 在Standalone Cluster模式下,任務提交後,Master會找到一個Worker啟動Driver。
  2. Driver啟動後向Master註冊應用程式,Master在Worker啟動Executor
  3. Worker上的Executor啟動後會向Driver反向註冊,
  4. 所有的Executor註冊完成後,Driver開始執行main函式,之後執行到Action運算元時,開始劃分Stage,每個Stage生成對應的taskSet,之後將Task分發到各個Executor上執行。

在Standalone Client模式下,Driver在任務提交的本地機器上執行

1.4YARN排程

資源排程和分配交給了YARN來處理

YARN-CLENT

YARN-CLUSTER

2.Spark通訊架構

資料來源尚矽谷

相關知識

  1. BIO(Blocking I/O):阻塞式IO

    假設去飯店吃飯:老闆在給前面先來的人做飯,自己就找個位置坐下等著

  2. NIO(New I/O):非阻塞式IO

    老闆在給前面先來的人做飯,自己去幹別的事情,過一段時間來詢問老闆飯是否做好。幹別的事情不安寧,總要記著這個事情。

  3. AIO(Asynchronous I/O):非同步非阻塞式IO

​ 老闆在給前面先來的人做飯,和老闆約定好什麼時候給我飯,專心幹別的事情

Spark基於Netty通訊

Driver於Exceutor通訊的方式,發件箱和收件箱,發件箱與服務通訊

3. Spark任務排程機制

Driver執行緒主要是初始化SparkContext物件,準備執行所需的上下文

  1. 一方面保持與ApplicationMaster的RPC連線,通過ApplicationMaster申請資源,

  2. 另一方面排程任務,將任務下發到Executor上。

資源排程與任務分配

  1. 當ResourceManager向ApplicationMaster返回Container資源時
  2. ApplicationMaster就嘗試在對應的Container上啟動Executor程序,
  3. Executor程序起來後,會向Driver反向註冊,
  4. 註冊成功後保持與Driver的心跳,同時等待Driver分發任務,當分發的任務執行完畢後,將任務狀態上報給Driver。

3.1Spark任務排程概述

Job、Stage以及Task

  1. 遇到一個Action方法則觸發一個Job
  2. Stage以RDD寬依賴(即Shuffle)為界,遇到Shuffle做一次劃分
  3. 一個Stage對應一個TaskSet

Spark RDD通過其Transactions操作,形成了RDD血緣(依賴)關係圖,即DAG,最後通過Action的呼叫,觸發Job並排程執行,
執行過程中會建立兩個排程器:DAGScheduler和TaskScheduler。

  1. DAGScheduler負責Stage級的排程,主要是將job切分成若干Stages,並將每個Stage打包成TaskSet交給TaskScheduler排程。
  2. TaskScheduler負責Task級的排程,將DAGScheduler給過來的TaskSet,分發到Executor上執行

Driver初始化SparkContext過程中,會分別初始化DAGScheduler、TaskScheduler

3.2 Spark Stage級排程

當遇到一個Action操作後就會觸發一個Job的計算,並交給DAGScheduler來提交,根據DAG進行切分,將一個Job劃分為若干Stages

  1. 劃分的Stages分兩類,一類叫做ResultStage,為DAG最下游的Stage,由Action方法決定,
  2. 一類叫做ShuffleMapStage,為下游Stage準備資料
錯誤重試:
只有Executor丟失或者Task由於Fetch失敗才需要重新提交失敗的Stage以排程執行失敗的任務,其他型別的Task失敗會在TaskScheduler的排程過程中重試。

3.3Spark Task級排程

TaskScheduler會將TaskSet封裝為TaskSetManager

TaskSetManager負責監控管理同一個Stage中的Tasks,TaskScheduler就是以TaskSetManager為單元來排程任務。

3.3.1排程策略

一種是FIFO:將TaskSetManager按照先來先到的方式入隊,出隊時直接拿出最先進隊的TaskSetManager

一種是FAIR:TaskSetMagager進行排序,要排序的TaskSetMagager物件包含三個屬性: runningTasks值(正在執行的Task數)、minShare值、weight值,
綜合考量三值進行排序

TaskSetManager封裝了一個Stage的所有Task,並負責管理排程這些Task。

Spark排程總是會盡量讓每個task以最高的本地性級別來啟動

  • 同一個Executor
  • 同一個節點
  • 同一個機架的兩個節點上

3.2 失敗重試

  1. Task被提交到Executor啟動執行後,
  2. Executor會將執行狀態上報給SchedulerBackend,
  3. SchedulerBackend則告訴TaskScheduler,TaskScheduler
  4. 到該Task對應的TaskSetManager,並通知到該TaskSetManager

TaskSetManager就知道Task的失敗與成功狀態,對於失敗的Task,會記錄它失敗的次數

在記錄Task失敗次數過程中,會記錄它上一次失敗所在的Executor Id和Host,這樣下次再排程這個Task時,會使用黑名單機制,避免它被排程到上一次失敗的節點上

4. Spark Shuffle解析

1.ShuffleMapStage的結束伴隨著shuffle檔案的寫磁碟。
2.ResultStage對應action運算元,即將一個函式應用在RDD的各個partition的資料集上,意味著一個job的執行結束

4.1HashShuffle

1.未優化的

Task 開始那邊各自進行 Hash 計算,每個task得到3個分類

2.優化的

啟用合併機制,合併機制就是複用buffer

在同一個程序中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer裡

然後把Buffer中的資料寫入以Core數量為單位的本地檔案中,(一個Core只有一種型別的Key的資料)

4.2SortShuffle

1.普通SortShuffle

  • 在溢寫磁碟前,先根據key進行排序,排序過後的資料,會分批寫入到磁碟檔案中
  • 也就是說一個Task過程會產生多個臨時檔案
  • 最後在每個Task中,將所有的臨時檔案合併,這就是merge過程
  • 此過程將所有臨時檔案讀取出來,一次寫入到最終檔案

意味著一個Task的所有資料都在這一個檔案中。同時單獨寫一份索引檔案,標識下游各個Task的資料在檔案中的索引,start offset和end offset。

2.bypass SortShuffle

觸發條件

  • shuffle reduce task數量小於觸發引數的閾值
  • 不是聚合類的shuffle運算元

此時task會為每個reduce端的task都建立一個臨時磁碟檔案

該過程的磁碟寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要建立數量驚人的磁碟檔案,只是在最後會做一個磁碟檔案的合併而已

而該機制與普通SortShuffleManager執行機制的不同在於:不會進行排序,shuffle write過程中,不需要進行資料的排序操作,也就節省掉了這部分的效能開銷

5.Spark記憶體管理

5.1堆內記憶體和堆外記憶體

堆內記憶體受到JVM統一管理,

堆外記憶體是直接向作業系統進行記憶體的申請和釋放。

Spark對堆內記憶體的規劃

  • 這些任務在快取 RDD 資料和廣播(Broadcast)資料時佔用的記憶體被規劃為儲存(Storage)記憶體,

  • 這些任務在執行 Shuffle 時佔用的記憶體被規劃為執行(Execution)記憶體

  • 剩餘的部分不做特殊規劃


Spark對堆內記憶體的管理是一種邏輯上的”規劃式”的管理,物件例項佔用記憶體的申請和釋放都由JVM完成

Spark只能在申請後和釋放前記錄這些記憶體。

​ 申請記憶體流程如下:

  • Spark 在程式碼中 new 一個物件例項;

  • JVM 從堆內記憶體分配空間,建立物件並返回物件引用;

  • Spark 儲存該物件的引用,記錄該物件佔用的記憶體。

    釋放記憶體流程如下:

  • Spark記錄該物件釋放的記憶體,刪除該物件的引用;

  • 等待JVM的垃圾回收機制釋放該物件佔用的堆內記憶體。

Spark 通過對儲存記憶體和執行記憶體各自獨立的規劃管理,可以決定是否要在儲存記憶體裡快取新的 RDD,以及是否為新的任務分配執行記憶體,在一定程度上
可以提升記憶體的利用率,減少異常的出現。

堆外記憶體是直接向作業系統進行記憶體的申請和釋放。

為了進一步優化記憶體的使用以及提高Shuffle時排序的效率,Spark引入了堆外(Off-heap)記憶體,使之可以直接在工作節點的系統記憶體中開闢空間

5.2統一記憶體管理

儲存記憶體和執行記憶體共享同一塊空間,可以動態佔用對方的空閒區域

雙方的空間都不足時,則儲存到硬碟;若己方空間不足而對方空餘時,可借用對方的空間

Task在啟動之初讀取一個分割槽時,會先判斷這個分割槽是否已經被持久化,如果沒有則需要檢查Checkpoint 或按照血統重新計算

Driver端的Master負責整個Spark應用程式的Block的元資料資訊的管理和維護,而Executor端的Slave需要將Block的更新等狀態上報到Master,
同時接收Master 的命令,例如新增或刪除一個RDD。

RDD 在快取到儲存記憶體之後,Partition 被轉換成Block,Record在堆內或堆外儲存記憶體中佔用一塊連續的空間。將Partition由不連續的儲存空間
轉換為連續儲存空間的過程,Spark稱之為"展開"(Unroll)。

Spark的儲存記憶體和執行記憶體有著截然不同的管理方式:

  • 對於儲存記憶體來說,Spark用一個LinkedHashMap來集中管理所有的Block,Block由需要快取的 RDD的Partition轉化而成;

  • 而對於執行記憶體,Spark用AppendOnlyMap來儲存 Shuffle過程中的資料,在Tungsten排序中甚至抽象成為頁式記憶體管理,開闢了全新的
    JVM記憶體管理機制。

2021.11.27 15:43