Spark原始碼《三》Stage劃分
當發生shuffle時,sc.runJob-->DAGScheduler.runJob-->submitStage(),提交stage時,
會首先判斷是否有未執行的父stage,如果沒有呼叫submitMissingTasks提交stage
如果有則呼叫submitStage()先提交父stage
1.sc.runJob
當有action運算元時,會呼叫sc.runJob方法,下圖是在原始碼中搜sc.runJob的結果
我們去SparkContext看下runJob方法:
def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { //fun:引數為迭代器 runJob(rdd, func, 0 until rdd.splits.size, false) }
def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
//fun:引數為TaskContext例項和迭代器
runJob(rdd, func, 0 until rdd.splits.size, false)
}
def runJob[T, U: ClassManifest]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal) }
def runJob[T, U: ClassManifest]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], allowLocal: Boolean ): Array[U] = { logInfo("Starting job...") val start = System.nanoTime//提供相對精確的計時 val result = scheduler.runJob(rdd, func, partitions, allowLocal) logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s") result }
可以看到有四個過載的方法,但最終都會呼叫第四個,從第四個可以看到呼叫了scheduler.runJob方法
private var scheduler: Scheduler = {
//正則表示式
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
master match {
case "local" => //如果是local,建立一個執行緒的本地排程器,失敗重試
new LocalScheduler(1, 0)
case LOCAL_N_REGEX(threads) => //如果是local[n],建立n個執行緒的本地排程器
new LocalScheduler(threads.toInt, 0)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>//如果是local[n,m],m為重試次數
new LocalScheduler(threads.toInt, maxFailures.toInt)
case _ =>//否則建立mesos排程器
MesosNativeLibrary.load()
new MesosScheduler(this, master, frameworkName)
}
}
2.DAGScheduler.runJob
因為MesosScheduler和LocalScheduler都繼承了DAGScheduler,直接去看DAGScheduler的runJob
程式碼有點多,就貼重要的出來
override def runJob[T, U](
finalRdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean)
(implicit m: ClassManifest[U]): Array[U] = {
lock.synchronized {
....定義的變數,陣列,集合
submitStage(finalStage)
....
}
接下來看submitStage()方法,
該方法主要先判斷waiting(HashSet儲存等待執行的stage)和running(HashSet儲存正在執行的stage)是否包含該stage,
如果都不包含,則呼叫getMissingParentStages獲取該stage還未執行的父stage,
如果沒有未執行的父stage,呼叫submitMissingTasks提交stage,並將stage加入running列表
如果有未執行的父stage,先提交父stage執行,並將stage加入waiting
def submitStage(stage: Stage) {
if (!waiting(stage) && !running(stage)) {
val missing = getMissingParentStages(stage)
if (missing == Nil) {
logInfo("Submitting " + stage + ", which has no missing parents")
submitMissingTasks(stage)
running += stage
} else {
for (parent <- missing) {
submitStage(parent)
}
waiting += stage
}
}
}
接下來先看getMissingParentStages方法,該方法主要用於獲取stage未執行的父stage,劃分stage
visit()方法首先遍歷未被劃分stage的rdd的依賴,
如果是shuffle依賴,呼叫getShuffleMapStage獲取父stage,
如果是窄依賴,繼續呼叫visit()方法,直到發生shuffle
可以看出stage與父stage劃分的依賴就是是否發生shuffle
def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]//儲存未執行的父stage
val visited = new HashSet[RDD[_]]//儲存已經劃分stage的RDD
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {//首先判斷是否劃分過
visited += rdd//新增到集合
val locs = getCacheLocs(rdd)
for (p <- 0 until rdd.splits.size) {//遍歷分割槽
if (locs(p) == Nil) {
for (dep <- rdd.dependencies) {//遍歷依賴
dep match {
case shufDep: ShuffleDependency[_,_,_] =>//shuffle依賴
val stage = getShuffleMapStage(shufDep)//獲取父stage
if (!stage.isAvailable) {
//isAvailable判斷為真的條件是:無父stage並且無shuffle依賴
missing += stage//將stage加入為執行的父stage佇列
}
case narrowDep: NarrowDependency[_] =>//窄依賴
visit(narrowDep.rdd)//父rdd繼續呼叫visit方法
}
}
}
}
}
}
visit(stage.rdd)
missing.toList
}
接下來看getShuffleMapStage()方法,該方法是發生shuffle時獲取父stage,
shuffleToMapStage是HashMap儲存(shuffleId,stage)鍵值對,首先取key為該shuffleId的值
有值,直接返回取到的stage
無值,新建Stage並將該stage加入shuffleToMapStage中
def getShuffleMapStage(shuf: ShuffleDependency[_,_,_]): Stage = {
shuffleToMapStage.get(shuf.shuffleId) match {
case Some(stage) => stage
case None =>
val stage = newStage(shuf.rdd, Some(shuf))
shuffleToMapStage(shuf.shuffleId) = stage
stage
}
}