1. 程式人生 > 實用技巧 >一個 Spark 應用程式的完整執行流程

一個 Spark 應用程式的完整執行流程

一個 Spark 應用程式的完整執行流程

1、編寫 Spark Application 應用程式
2、打 jar 包,通過 spark-submit 提交執行
3、SparkSubmit 提交執行
4、執行 Spark Application 的 main 方法
5、初始化 SparkContext,這一步主要是把執行 Application 所需要的一個 Driver 和多個 Executor 啟動起來
6、執行到 Action 運算元,這個階段會產生 DAG 血緣依賴關係,但是並沒有真正執行
7、執行 Action 運算元,生成一個 Job 提交執行
8、DAGScheduler 會對提交的 Job 進行 Stage 切分
9、TaskSchedule 通過 TaskSet 獲取 job 的所有 Task,然後序列化分給 Exector
.... 
shuffle

Application、Job、Stage 和 Task

1、Application:初始化一個 SparkContext 即生成一個 Application;
2、Job:一個 Action 運算元就會生成一個 Job;
3、Stage:Stage 等於寬依賴的個數加 1;
4、Task:一個 Stage 階段中,最後一個 RDD 的分割槽個數就是 Task 的個數。

注意:Application->Job->Stage->Task每一層都是1對n的關係

Spark Application 提交分析

入口:spark application 中的 action 運算元!(SparkPi 程式中的 reduce 函式)

以 SparkPi 程式舉例:reduce() 運算元就是提交 job 的入口

最後到:

dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

從此,任務的提交就交給了 dagScheduler

Spark App Stage 切分分析

入口:EventLoop 中的 eventQueue.take() 方法

如果任務提交,則有 JobSubmitted 事件提交到 eventQueue 中,則 eventQueue.take() 阻塞返回,此時的 event 就是 JobSubmitted。

根據事件機制,跳轉到:DAGScheduler.handleJobSubmitted()

兩個核心的方法:

// stage切分入口
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
// 提交stage執行入口
submitStage(finalStage)

方法依賴關係:

1、createResultStage(傳入finalRDD獲得ResultStage) ->2
2、getOrCreateParentStages(傳入rdd獲得父stage) ->3->4
	3、getShuffleDependencies(傳入rdd獲得寬依賴)
	4、getOrCreateShuffleMapStage(傳入寬依賴獲得ShuffleMapStage) ->5->6
		5、getMissingAncestorShuffleDependencies(傳入一個rdd獲得所有寬依賴) ->3
		6、createShuffleMapStage(傳入寬依賴獲得ShuffleMapStage) ->2

Spark Task 分發和執行分析

入口:

taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
backend.reviveOffers()

總結一下:

1、使用者編寫 spark 應用程式
2、達成jar包
3、通過spark-submit 提交執行
4、sparkSessioin sparkContext 初始化
5、執行action運算元
6、sparkContext.runJob()
7、dagScheduler.handleJobSubmitted()
8、dagScheduler.runJob()
	createResultStage() stage切分
	submitStage()
9、taskScheduler.submitTasks(new TaskSet())
10、schedulerBackEnd.reviveOffers();
11、Driver傳送 LaunchTask 訊息給 Executor 
12、Executor 就會封裝Task 為一個 TaskRunner 物件,提交給該 Executor 的執行緒池執行
13、Executor 執行的Task 有可能是 ShuffleMapTask,也有可能是ResultTask
14、ShuffleMapTask 會後續的 Shuffle操作,具體有 Writer 完成

Spark Suffle 原始碼分析

入口:

Task.runTask()