Spark 執行流程
本文結合原始碼和《圖解Spark核心技術與案例實戰》簡單分析了Spark的job執行過程。
分析的案例程式碼如下,是一個簡單的word count 函式。
public static void main( String[] args )
{
String logFile = "wordcount.txt";
SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> logData = sc.textFile(logFile).cache();
JavaRDD<String> words = logData.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) throws Exception {
return Arrays.asList(s.split(" "));
}
});
JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}
});
JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
// 觸發操作
List<Tuple2<String,Integer>> counts= wordCount.collect();
for (Tuple2<String, Integer> stringIntegerTuple2 : counts) {
System.out.println(stringIntegerTuple2._1+":"+stringIntegerTuple2._2);
}
sc.stop();
}
// wordcount.txt
one
two two
three three three
four four four four
five five five five five
six six six six six six
重要的類
首先講講幾個重要的類,方便接下來有目的性的去探索。介紹僅涉及到執行時分析的角度,不會詳細講解。
RDD
計算的真正開始時在RDD類的Action操作之後開始,這是典型的lazy模式。上述程式碼執行的開始也是在RDD的collect()
函式里正式開始。而之前的flatMap,mapToPair()等操作都只是在構建DAG圖。
SparkContext
SparkContext 是Driver 的核心,rdd裡面的呼叫最終會轉移到SparkContext的呼叫,SparkContext 使用DAGSchedular和TaskSchedular來進行DAG的解析。
DAGScheduler
DAGScheduler負責對DAG進行解析,劃分排程階段。一個排程階段可以看成一組可以並行執行的任務。如本例的flatMap和後面的mapToPair()是一個排程階段,後面的reduceByKey是另一個排程階段(stage)。排程階段是按照寬依賴來劃分的。劃分為排程階段之後DAGScheduler將不同的排程階段封裝為不同的task傳送給DAGScheduler。
TaskScheduler
TaskScheduler的工作是排程Task,接收到DAGSchedular傳送過來的task之後TaskScheduler將這些Task分發給Executor執行。
ResultStage和ShuffleMapStage
ResultStage合ShuffleMapStage是解析DAG的結果,表示兩種排程階段。ResultStage表示最後一個RDD生成的排程階段,ShuffleMapStage則表示之前的一個shuffle的排程階段,因為有shuffle操作的前後兩個操作不能放到一個排程階段(因為是寬依賴,不能並行執行),所以這樣劃分。如果一個job沒有shuffle操作,那麼這個job就只有一個ResultStage,如果一個job有shuffle操作,那麼這個job就存在至少一個shuffleMapStage和一個ResultStage。
執行的過程
提交階段
提交DAG 到DAGScheduler
上例中呼叫了collect之後就觸發了任務的執行。任務執行的第一個階段是提交階段,collect內部呼叫情況如下
RDD.scala
/**
* Return an array that contains all of the elements in this RDD.
* 首先是呼叫了SparkContext的runJob方法
*/
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
SparkContext.scala
/**
* 在SparkContext裡面幾段呼叫之後就轉移到了這個函式裡面,可以看出,這個函式主要是呼叫了dagSchedular的runJob
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark. The allowLocal
* flag specifies whether the scheduler can run the computation on the driver rather than
* shipping it out to the cluster, for short actions like first().
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
DAGScheduler.scala
一層簡單的封裝呼叫,插入了輸出日誌的語句,對實際執行沒有影響
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
// 呼叫submitJob將job提交到事件佇列
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
waiter.awaitResult() match {
case JobSucceeded =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
throw exception
}
}
DAGSchedular.scala
這裡是使用一個事件迴圈的幫助類,將submitJob這個任務放到了事件迴圈中
/**
* Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
* can be used to block until the the job finishes executing or can be used to cancel the job.
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
DAGScheduler.scala
在這裡接收到了事件,這是一個生產者,消費者佇列的模式,上面的eventProcessLoop使用了一個阻塞佇列,這裡根據從佇列中取出的不同任務採取不同的操作,可以從這個函式看到DAGScheduler在job的不同生命週期階段的行為。
/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite,
listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
dagScheduler.handleTaskSetFailed(taskSet, reason)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
DAGScheduler將DAG解析為stage
在handleJobSubmitted方法裡面就完成了從rdd到stage的轉變,主要是在封裝的newResultStage裡面進行的。
/**
* Create a ResultStage -- either directly for use as a result stage, or as part of the
* (re)-creation of a shuffle map stage in newOrUsedShuffleStage. The stage will be associated
* with the provided jobId.
*/
private def newResultStage(
rdd: RDD[_],
numTasks: Int,
jobId: Int,
callSite: CallSite): ResultStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)
stageIdToStage(id) = stage
// 這裡解析了之前的stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
stage的解析是從後向前的解析,也就是從最後一個ResultStage向前倒推,尋找shuffle操作,然後根據shuffle操作生成ShuffleMapStage
/**
* Registers the given jobId among the jobs that need the given stage and
* all of that stage's ancestors.
*/
private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
def updateJobIdStageIdMapsList(stages: List[Stage]) {
if (stages.nonEmpty) {
val s = stages.head
s.jobIds += jobId
jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
// 這個方法是解析stage的主要演算法的實現
val parents: List[Stage] = getParentStages(s.rdd, jobId)
// 將自身移除
val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) }
updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
}
}
updateJobIdStageIdMapsList(List(stage))
}
這裡是個閉包呼叫,裡面的getParentStages()方法是解析stages的實現
/**
* Get or create the list of parent stages for a given RDD. The stages will be assigned the
* provided jobId if they haven't already been created with a lower jobId.
*/
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new Stack[RDD[_]]
// 廣度優先遍歷依賴,建立stages
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// Kind of ugly: need to register RDDs with the cache here since
// we can't do it in its constructor because # of partitions is unknown
// 遍歷rdd的所有依賴
for (dep <- r.dependencies) {
dep match {
// 如果是shuffle操作,那麼就在parents裡面加一個shuffleMapStage
case shufDep: ShuffleDependency[_, _, _] =>
parents += getShuffleMapStage(shufDep, jobId)
// 不是就加入遍歷佇列,等待接下來的訪問
case _ =>
waitingForVisit.push(dep.rdd)
}
}
}
}
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
parents.toList
}
上面的程式碼使用圖的廣度優先遍歷演算法便利了DAG(也就是RDD的依賴圖),然後根據依賴的操作是否為shuffle操作建立stage,注意這裡是找出了所有的被ResultStage依賴的ShuffleMapStage,但是不一定就是直接依賴,間接依賴也放到parents裡面了,那麼每個parent的依賴是怎麼處理的呢?是遞迴呼叫了getParentStages()來進行處理的。
/**
* Get or create a shuffle map stage for the given shuffle dependency's map side.
* The jobId value passed in will be used if the stage doesn't already exist with
* a lower jobId (jobId always increases across jobs.)
* getParentStages()裡面建立新的shuffleStage的時候呼叫了這個函式
*/
private def getShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int): ShuffleMapStage = {
shuffleToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, jobId)
// Then register current shuffleDep
// 建立一個shuffleStage
val stage = newOrUsedShuffleStage(shuffleDep, jobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage
stage
}
}
/**
* Create a shuffle map Stage for the given RDD. The stage will also be associated with the
* provided jobId. If a stage for the shuffleId existed previously so that the shuffleId is
* present in the MapOutputTracker, then the number and location of available outputs are
* recovered from the MapOutputTracker
*/
private def newOrUsedShuffleStage(
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.size
// 建立新的shuffleMapStage
val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
for (i <- 0 until locs.size) {
stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
}
stage.numAvailableOutputs = locs.count(_ != null)
} else {
// Kind of ugly: need to register RDDs with the cache and map output tracker here
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.size)
}
stage
}
/**
* 這個方法最後又回到了updateJobIdStageIdMaps()這個方法的呼叫,形成了遞迴
*/
private def newShuffleMapStage(
rdd: RDD[_],
numTasks: Int,
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int,
callSite: CallSite): ShuffleMapStage = {
val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
jobId, callSite, shuffleDep)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
截止到這裡就完成了stage的解析,之後的工作就是將stage轉化成tasks然後傳送給TaskSchedular排程
/** Submits stage, but first recursively submits any missing parents. */
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
// 這裡會判斷是否有依賴的stage,要是有依賴的,那麼要讓依賴的stage先執行
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
// 沒有依賴的stage,提交這個stage
submitMissingTasks(stage, jobId