Spark階段性總結
在大資料領域,只有深挖資料科學領域,走在學術前沿,才能在底層演算法和模型方面走在前面,從而佔據領先地位。
一、Spark專業術語定義
1.Application:Spark應用程式
指的是使用者編寫的Spark應用程式,包含了Driver功能程式碼和分佈在叢集中多個節點上執行的Executor程式碼。
Spark應用程式,由一個或多個作業JOB組成,如下圖所示:
2.Dirver:驅動程式
Spark中的Driver即執行上述Application的Main()函式並且建立SparkContext,其中建立SparkContext的目的是為了準備Spark應用程式的執行環境。在Spark中由SparkContext負責和ClusterManager通訊,進行資源的申請、任務的分配和監控等;當Executor部分執行完畢後,Driver負責將SparkContext關閉。
如果你是用spark shell,那麼當你啟動 Spark shell的時候,系統後臺自啟了一個 Spark 驅動器程式,就是在Spark shell 中預載入的一個叫作 sc 的 SparkContext 物件。如果驅動器程式終止,那麼Spark 應用也就結束了。
通常SparkContext代表Driver,如下圖所示:
3. Cluster Manager : 資源管理器
指的是在叢集上獲取資源的外部服務,常用的有:
Standalone,Spark原生的資源管理器,由Master負責資源的分配;
Haddop Yarn,由Yarn中的ResearchManager負責資源的分配;
Messos,由Messos中的Messos Master負責資源管理.
如下圖:
4.Executor:執行器
Application執行在Worker節點上的一個程序,該程序負責執行Task,並且負責將資料存在記憶體或者磁碟上,每個Application都有各自獨立的一批Executor.
如下圖:
5.Worker:計算節點
叢集中任何可以執行Application程式碼的節點,類似於Yarn中的NodeManager節點。在Standalone模式中指的就是通過Slave檔案配置的Worker節點,在Spark on Yarn模式中指的就是NodeManager節點,在Spark on Messos模式中指的就是Messos Slave節點。
如下圖:
6.RDD:彈性分散式資料集
Resillient Distributed Dataset,Spark的基本計算單元,可以通過一系列運算元進行操作(主要有Transformation和Action操作).
如下圖
7.窄依賴
父RDD每一個分割槽最多被一個子RDD的分割槽所用;表現為一個父RDD的分割槽對應於一個子RDD的分割槽,或兩個父RDD的分割槽對應於一個子RDD 的分割槽。
如下圖
8.寬依賴
父RDD的每個分割槽都可能被多個子RDD分割槽所使用,子RDD分割槽通常對應所有的父RDD分割槽。
如下圖
常見的窄依賴有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned :如果JoinAPI之前被呼叫的RDD API是寬依賴(存在shuffle), 而且兩個join的RDD的分割槽數量一致,join結果的rdd分割槽數量也一樣,這個時候join api是窄依賴)。
常見的寬依賴有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned :除此之外的,rdd 的join api是寬依賴)。
9.DAG:有向無環圖
Directed Acycle graph,反應RDD之間的依賴關係
如圖所示
10.DAGScheduler:有向無環圖排程器
基於DAG劃分Stage 並以TaskSet的形勢提交Stage給TaskScheduler;負責將作業拆分成不同階段的具有依賴關係的多批任務;最重要的任務之一就是:計算作業和任務的依賴關係,制定排程邏輯。在SparkContext初始化的過程中被例項化,一個SparkContext對應建立一個DAGScheduler。
11.TaskScheduler:任務排程器
將Taskset提交給worker(叢集)執行並回報結果;負責每個具體任務的實際物理排程。
如下圖
12.Job:作業
由一個或多個排程階段所組成的一次計算作業;包含多個Task組成的平行計算,往往由Spark Action催生,一個JOB包含多個RDD及作用於相應RDD上的各種Operation。
呼叫RDD的一個action,如count,即觸發一個Job,spark中對應實現為ActiveJob,DAGScheduler中使用集合activeJobs和jobIdToActiveJob維護Job
13.Stage:排程階段
一個任務集對應的排程階段;每個Job會被拆分很多組Task,每組任務被稱為Stage,也可稱TaskSet,一個作業分為多個階段;Stage分成兩種型別ShuffleMapStage、ResultStage。
代表一個Job的DAG,會在發生shuffle處被切分,切分後每一個部分即為一個Stage,一個Job切分的結果是0個或多個ShuffleMapStage加一個ResultStage
14.TaskSet:任務集
由一組關聯的,但相互之間沒有Shuffle依賴關係的任務所組成的任務集。
提示:
1)一個Stage建立一個TaskSet;
2)為Stage的每個Rdd分割槽建立一個Task,多個Task封裝成TaskSet
15.Task:任務
被送到某個Executor上的工作任務;單個分割槽資料集上的最小處理流程單元。
最終被髮送到Executor執行的任務,和stage的ShuffleMapStage和ResultStage對應,其實現分為ShuffleMapTask和ResultTask
總體如圖所示:
二、Spark執行基本流程
三、Spark執行架構特點
1、Executor程序專屬
每個Application獲取專屬的executor程序,該程序在Application期間一直駐留,並以多執行緒方式執行tasks。Spark Application不能跨應用程式共享資料,除非將資料寫入到外部儲存系統。
2、支援多種資源管理器
Spark與資源管理器無關,只要能夠獲取executor程序,並能保持相互通訊就可以了,Spark支援資源管理器包含: Standalone、On Mesos、On YARN、Or On EC2
3、Job提交就近原則
提交SparkContext的Client應該靠近Worker節點(執行Executor的節點),最好是在同一個Rack(機架)裡,因為Spark Application執行過程中SparkContext和Executor之間有大量的資訊交換;如果想在遠端叢集中執行,最好使用RPC將SparkContext提交給叢集,不要遠離Worker執行SparkContext。
4、移動程式而非移動資料的原則執行
Task採用了資料本地性和推測執行的優化機制。關鍵方法:taskIdToLocations、getPreferedLocations。
四、Spark核心原理透視
1、計算流程
2、從程式碼構建DAG圖
public static void main(String[] args) {
String inputPath1 = "E:/ml-1m/ratings.dat";
String inputPath2 = "E:/ml-1m/users.dat";
String inputPath3 = "E:/ml-1m/movies.dat";
SparkSession session = SparkSession.builder().appName("test").master("local[*]").getOrCreate();
JavaPairRDD<String, Rating> lines1 = session.read().textFile(inputPath1).javaRDD().map(line -> line.split("::")).mapToPair(n -> {
return new Tuple2<String, Rating>(n[0], Rating.apply(Integer.valueOf(n[0]), Integer.valueOf(n[1]), Double.valueOf(n[2])));
});
JavaPairRDD<String, Rating> lines2 = session.read().textFile(inputPath2).javaRDD().map(line -> line.split("::")).mapToPair(n -> {
return new Tuple2<String, Rating>(n[0], Rating.apply(Integer.valueOf(n[0]), Integer.valueOf(n[1]), Double.valueOf(n[2])));
});
JavaPairRDD<String, Rating> lines3 = session.read().textFile(inputPath3).javaRDD().map(line -> line.split("::")).mapToPair(n -> {
return new Tuple2<String, Rating>(n[0], Rating.apply(Integer.valueOf(n[0]), Integer.valueOf(n[1]), Double.valueOf(n[2])));
});
JavaPairRDD<String, Rating> dtinone1 = lines2.union(lines3);
JavaPairRDD<String, Tuple2<Rating, Rating>> dtinone = lines1.join(dtinone1);
dtinone.saveAsTextFile("C:/a.txt");
dtinone.filter(n -> n != null).foreach(n -> {
});
}
Spark的計算髮生在RDD的Action操作,而對Action之前的所有Transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發真正的計算。
Spark核心會在需要計算髮生的時刻繪製一張關於計算路徑的有向無環圖,也就是DAG。
3、將DAG劃分為Stage核心演算法
Application多個job多個Stage:Spark Application中可以因為不同的Action觸發眾多的job,一個Application中可以有很多的job,每個job是由一個或者多個Stage構成的,後面的Stage依賴於前面的Stage,也就是說只有前面依賴的Stage計算完畢後,後面的Stage才會執行。
劃分依據:Stage劃分的依據就是寬依賴,何時產生寬依賴,reduceByKey, groupByKey等運算元,會導致寬依賴的產生。
核心演算法:從後往前回溯,遇到窄依賴加入本stage,遇見寬依賴進行Stage切分。Spark核心會從觸發Action操作的那個RDD開始從後往前推,首先會為最後一個RDD建立一個stage,然後繼續倒推,如果發現對某個RDD是寬依賴,那麼就會將寬依賴的那個RDD建立一個新的stage,那個RDD就是新的stage的最後一個RDD。然後依次類推,繼續繼續倒推,根據窄依賴或者寬依賴進行stage的劃分,直到所有的RDD全部遍歷完成為止。
4、將DAG劃分為Stage剖析
從HDFS中讀入資料生成3個不同的RDD,通過一系列transformation操作後再將計算結果儲存回HDFS。可以看到這個DAG中只有join操作是一個寬依賴,Spark核心會以此為邊界將其前後劃分成不同的Stage. 同時我們可以注意到,在圖中Stage2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,通過map操作生成的partition可以不用等待整個RDD計算結束,而是繼續進行union操作,這樣大大提高了計算的效率。
5、相關程式碼
6、提交Stages
排程階段的提交,最終會被轉換成一個任務集的提交,DAGScheduler通過TaskScheduler介面提交任務集,這個任務集最終會觸發TaskScheduler構建一個TaskSetManager的例項來管理這個任務集的生命週期,對於DAGScheduler來說,提交排程階段的工作到此就完成了。而TaskScheduler的具體實現則會在得到計算資源的時候,進一步通過TaskSetManager排程具體的任務到對應的Executor節點上進行運算。
7、相關程式碼
TaskSetManager負責管理TaskSchedulerImpl中一個單獨TaskSet,跟蹤每一個task,如果task失敗,負責重試task直到達到task重試次數的最多次數。
8、監控Job、Task、Executor
DAGScheduler監控Job與Task:要保證相互依賴的作業排程階段能夠得到順利的排程執行,DAGScheduler需要監控當前作業排程階段乃至任務的完成情況。這通過對外暴露一系列的回撥函式來實現的,對於TaskScheduler來說,這些回撥函式主要包括任務的開始結束失敗、任務集的失敗,DAGScheduler根據這些任務的生命週期資訊進一步維護作業和排程階段的狀態資訊。
DAGScheduler監控Executor的生命狀態:TaskScheduler通過回撥函式通知DAGScheduler具體的Executor的生命狀態,如果某一個Executor崩潰了,則對應的排程階段任務集的ShuffleMapTask的輸出結果也將標誌為不可用,這將導致對應任務集狀態的變更,進而重新執行相關計算任務,以獲取丟失的相關資料。
9、獲取任務執行結果
結果DAGScheduler:一個具體的任務在Executor中執行完畢後,其結果需要以某種形式返回給DAGScheduler,根據任務型別的不同,任務結果的返回方式也不同。
兩種結果,中間結果與最終結果:對於FinalStage所對應的任務,返回給DAGScheduler的是運算結果本身,而對於中間排程階段對應的任務ShuffleMapTask,返回給DAGScheduler的是一個MapStatus裡的相關儲存資訊,而非結果本身,這些儲存位置資訊將作為下一個排程階段的任務獲取輸入資料的依據。
兩種型別,DirectTaskResult與IndirectTaskResult:根據任務結果大小的不同,ResultTask返回的結果又分為兩類,如果結果足夠小,則直接放在DirectTaskResult物件內中,如果超過特定尺寸則在Executor端會將DirectTaskResult先序列化,再把序列化的結果作為一個數據塊存放在BlockManager中,然後將BlockManager返回的BlockID放在IndirectTaskResult物件中返回給TaskScheduler,TaskScheduler進而呼叫TaskResultGetter將IndirectTaskResult中的BlockID取出並通過BlockManager最終取得對應的DirectTaskResult。