Spark:DAGScheduler原理剖析與原始碼分析
Job觸發流程原理與原始碼解析
wordcount案例解析,來分析Spark Job的觸發流程
程式碼:var linesRDD= sc.textFile('hdfs://')
SparkContext中textFile方法
/** * hadoopFile方法呼叫會建立一個HadoopRDD,其中的元素pair是(key,value) * key是hdfs或者文字檔案的每一行的offset,value就是文字行 * 然後,呼叫map方法,過濾掉key,剩下value,最終獲得一個MapPartitionRDD,其內部是一行一行的文字行 */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() // hadoop mapreduce TextInputFormat:讀取文字資料的一種方式 LongWritable 讀取到的文字的偏移量(行號) Text 讀到的文字 // map操作中,pair就是行號和文字資料對映的tuple,pair._2.toString 就是取資料 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString) }
hadoopFile方法
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) // 建立的HadoopRDD讀取配置檔案的之後,上面已經做了廣播變數,在本機worker上就可以讀到 new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
hadoopFile().map() 方法最終呼叫的是RDD.scala的map方法
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
程式碼:var wordsRDD = linesRDD.flatMap(line => line.split(" "))
RDD.scala的flatMap方法
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
程式碼:var pairsRDD = wordsRDD.map(word => (word,1))
RDD.scala的map方法
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
程式碼:var countRDD = pairsRDD.reduceByKey(_+_)
rdd中是沒有reduceByKey方法的,這裡有一個隱式轉換(相當於Java中的包裝類),程式執行過程中會找RDD.scala中的一個隱式轉換程式碼
implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
new PairRDDFunctions(rdd)
}
最終建立了PairRDDFunctions類,MapPartitionRDD呼叫reduceByKey會觸發scala隱式轉換,在作用域中內尋找隱式轉換,最終將MapPartitionRDD轉換為PairRDDFunctions,呼叫reduceByKey方法。
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
程式碼:countRDD.foreach(count => println(count._1 + ":"+count._2))
RDD.scala的foreach方法
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
有多個runJob的巢狀呼叫
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
// 核心:呼叫SparkContext初始化建立的DAGSChduler的runJob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
Stage劃分演算法的剖析
由於Spark的運算元構建一般都是鏈式的,這就涉及了要如何進行這些鏈式計算,Spark的策略是對這些運算元,先劃分Stage,然後在進行計算。
演算法總結:會從出發action操作的那個RDD往前倒推,首先會為最後一個RDD建立一個stage(stage1),然後往前倒推的時候,如果發現對某個RDD的寬依賴,那麼就會將寬依賴的那個RDD建立一個新的stage(stage0),那個RDD就是新的stage的最後一個RDD;最後依此類推,繼續往前倒推,根據窄依賴,或者寬依賴,進行stage的劃分,知道所有的RDD全部遍歷完了為止。
Spark Web頁面
原始碼解析
第一步:DAGScheduler類中runJob方法
原始碼地址:org.apache.spark.scheduler.DAGScheduler
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
第二步:DAGScheduler類中submitJob方法
private[spark] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
......
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
// JobSubmitted物件給eventProcessLoop(訊息迴圈器)
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
//這裡面封裝了哪些Partition要進行計算,joblistener作業監聽等等
private[scheduler] case class JobSubmitted(
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties = null)
extends DAGSchedulerEvent
......
第三步:eventProcessLoop類中post方法
//雙端佇列,任何一端都可以進行元素的出入
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()
private val stopped = new AtomicBoolean(false)
// Exposed for testing.
private[spark] val eventThread = new Thread(name) {
setDaemon(true)
override def run(): Unit = {
try {
while (!stopped.get) {
//接收訊息。在這裡並沒有直接實現OnReceive方法,具體方法實現是在DAGScheduler#onReceive
val event = eventQueue.take() //取出放入的訊息
try {
onReceive(event)
} catch {
case NonFatal(e) =>
try {
onError(e)
} catch {
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
} catch {
case ie: InterruptedException => // exit even if eventQueue is not empty
case NonFatal(e) => logError("Unexpected error in " + name, e)
}
}
}
......
def post(event: E): Unit = {
eventQueue.put(event)
}
第三步:onReceive方法
protected def onReceive(event: E): Unit //抽象方法呼叫子類的實現。
第三步:onReceive方法子類(DAGSchedulerEventProcessLoop)的實現
原始碼地址:org.apache.spark.scheduler.DAGScheduler.DAGSchedulerEventProcessLoop
/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
第三步:doOnReceive方法
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
......
}
DAGScheduler啟動一個執行緒EventLoop(訊息迴圈器),不斷地從訊息佇列中取訊息。訊息是通過EventLoop的put方法放入訊息佇列,當EventLoop拿到訊息後會回撥DAGScheduler的OnReceive,進而呼叫doOnReceive方法進行處理
第四步:handleJobSubmitted方法
/**
* DAGSchduler的job排程核心入口
*/
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],//引數finalRDD為觸發action操作時最後一個RDD
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
// 使用觸發job的最後一個rdd,建立stage
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
//第一步 建立一個結果輸出stage,並加入DAGSchduler內部的記憶體快取中
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: BarrierJobSlotsNumberCheckFailed =>
logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
"than the total number of slots in the cluster currently.")
// If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
new BiFunction[Int, Int, Int] {
override def apply(key: Int, value: Int): Int = value + 1
})
if (numCheckFailures <= maxFailureNumTasksCheck) {
messageScheduler.schedule(
new Runnable {
override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
partitions, callSite, listener, properties))
},
timeIntervalNumTasksCheck,
TimeUnit.SECONDS
)
return
} else {
// Job failed, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
listener.jobFailed(e)
return
}
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// Job submitted, clear internal data.
barrierJobIdToNumTasksCheckFailures.remove(jobId)
// 第二步 用finalStage建立一個job,即這個job的最後一個stage
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
//第三步,將job加入記憶體快取中
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
//第四步,使用submitStage()方法提交finalStage
submitStage(finalStage)
}
第四步:createResultStage方法
/**
* Create a ResultStage associated with the provided jobId.
*/
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
checkBarrierStageWithDynamicAllocation(rdd)
checkBarrierStageWithNumSlots(rdd)
checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
//獲取父stages
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage //將Stage的id放入stageIdToStage(HashMap)結構中。
updateJobIdStageIdMaps(jobId, stage) //更新JobIdStageIdMaps
stage
}
第五步:submitStage方法,stage劃分演算法的入口
/** Submits stage, but first recursively submits any missing parents. */
/**
* 這裡是stage劃分的入口,submitStage與getMissingParentStages方法共同完成了stage的劃分
*
*/
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
//呼叫getMissingParentStages()方法,去獲取當前這個stage的父stage,並且按照id進行排序,
//從小到大的進行排序,從前向後依次計算,這樣做的原因是不同的rdd存在依賴關係
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//首先提交這個第一個stage,stage0,其餘的stage,此時全部都在waitingStage裡
submitMissingTasks(stage, jobId.get)
} else {
// 遞迴呼叫
for (parent <- missing) {
submitStage(parent)
}
// 將當前stage放入等待執行的stage佇列
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
第六步:getMissingParentStages方法
/**
* 獲取某個stage的父stage
*
* 如果最後一個rdd的所有依賴都是窄依賴,就不會建立新的shuffle stage
* 但是,如果stage的rdd是寬依賴,就使用rdd的寬依賴建立新的stage,立即新的stage返回
*
*/
private def getMissingParentStages(stage: Stage): List[Stage] = {
//定義一個hashset來存放stage
val missing = new HashSet[Stage]
//儲存已經被訪問的RDD,構建的時候是從後往前回溯的一個過程,回溯過之後就會被儲存起來。
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
//這裡只是快取RDD的資訊,並未真正計算,因為此時我們並沒有partition的資訊。
val waitingForVisit = new ArrayStack[RDD[_]] //儲存需要被處理的RDD
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd //如果沒有被回溯過,那麼就將此RDD加入HashSet中
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) { //如果分割槽的數量是非空的
//遍歷RDD依賴
for (dep <- rdd.dependencies) {
dep match {
//那麼使用寬依賴的那個rdd,建立一個stage
//預設最後一個stage,不是shuffleMap stage,而是ResultStage
//但是finalStage之前所有的stage,都是shuffleMap stage
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
//如果是窄依賴,那麼將依然的RDD放入棧中
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
// 往棧中,加入stage最後的一個rdd
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
//呼叫內部visit方法
visit(waitingForVisit.pop())
}
//返回這個stage的所有的父親節點,便於在後面遞迴的去查詢
missing.toList
}
第七步:getOrCreateShuffleMapStage方法
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
// Create stages for all missing ancestor shuffle dependencies.
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
//建立一個shuffle stage
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
}
}
下面我們以下面的圖為例,來詳細敘述Stage的劃分
第一次迴圈:
1.RDD G傳進來,將RDD G壓棧.
waitingForVisit.push(stage.rdd)
2.此時的棧不空,將棧裡面的RDD G彈出,作為引數傳入visit函式內。
while (waitingForVisit.nonEmpty) {
//呼叫內部visit方法
visit(waitingForVisit.pop())
}
3.RDD G沒有被訪問過,所以執行if中的程式碼
val waitingForVisit = new ArrayStack[RDD[_]] //儲存需要被處理的RDD
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
4.對RDD G進行處理,加入visited中。
visited += rdd
5.遍歷RDD G依賴的父RDD
for (dep <- rdd.dependencies) {
dep match {
//若是 RDD F 建立新的Stage,並將新建立的Stage儲存到missing 中
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
//若是RDD F 則為和RDD G是一個Stage,然後就將RDD F壓棧
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
RDD G依賴兩個RDD:
- RDD B的依賴關係是窄依賴,因此,並不會產生新的RDD,只是將RDD B 壓入Stack棧中
- RDD F的依賴關係是寬依賴,因此,RDD G 和RDD F會被劃分成兩個Stage,Shuffle依賴的關係資訊儲存在missing 中,並且,RDD F所在的Stage 2是RDD G所在的Stage 3的父Stage
第二次迴圈:Stack棧中 RDD B
RDD B所依賴的父RDD是RDD A 之間是寬依賴關係,因此要建立一個新的Stage為Stage 1
此時 missing 中存在兩個Stage,分別是 Stage 1與Stage 2
遞迴呼叫Stage 1,A無父RDD,提交stage
// 遞迴呼叫,直到最初的stage,他沒有父stage
for (parent <- missing) {
submitStage(parent)
}
遞迴呼叫Stage 2,而RDD F的父RDD都是窄依賴,所以不產生新的Stage,均為Stage 2.
stage劃分演算法總結:
- 從finalStage倒推
- 通過寬依賴,來進行新的stage的劃分
- 使用遞迴,優先提交父stage
Task任務本地性演算法
1.在submitMissingTasks中會通過呼叫以下程式碼來獲取任務的本地性。
//提交stage,為stage建立一批task,task數量與partition數量相同
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute.
//獲取你要建立的task的數量
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
//將stage加入runningStage佇列
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
//獲得Task的本地性,task的最佳位置計算演算法
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id)) }.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
// If there are tasks to execute, record the submission time of the stage. Otherwise,
// post the even without the submission time, which indicates that this stage was
// skipped.
if (partitionsToCompute.nonEmpty) {
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
}
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
var partitions: Array[Partition] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
var taskBinaryBytes: Array[Byte] = null
// taskBinaryBytes and partitions are both effected by the checkpoint status. We need
// this synchronization in case another concurrent job is checkpointing this RDD, so we get a
// consistent view of both variables.
RDDCheckpointData.synchronized {
taskBinaryBytes = stage match {
case stage: ShuffleMapStage =>
JavaUtils.bufferToArray(
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
case stage: ResultStage =>
JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
}
partitions = stage.rdd.partitions
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
// Abort execution
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
//根據不同的Stage型別建立不同的tasks佇列
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
//為每一個Partition建立ShuffleMapTask,並計算最近位置
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
//對finalStage之外的,都建立ShuffleMapTask
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
//如果不是shuffleMap,那麼就是finalStage 是建立ResultTask的
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
stage.rdd.isBarrier())
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
//將Tasks封裝到TaskSet中,並將TaskSet提交給TaskScheduler。
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
stage match {
case stage: ShuffleMapStage =>
logDebug(s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})")
// 如果Tasks執行完了,表示該Stage執行完成
markMapStageJobsAsFinished(stage)
case stage: ResultStage =>
logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
}
submitWaitingChildStages(stage)
}
}
2.getPreferredLocsInternal方法
/**
* Recursive implementation for getPreferredLocs.
*
* This method is thread-safe because it only accesses DAGScheduler state through thread-safe
* methods (getCacheLocs()); please be careful when modifying this method, because any new
* DAGScheduler state accessed by it may require additional synchronization.
*
* 計算每個task對應Partition最佳位置
* 從stage的最後一個rdd開始,去找哪個rdd的partition,是被cache了,或者checkpoint
* 那麼,task的最佳位置,就是快取的 checkpoint的partition的位置
* 因為這樣的話,task就在哪個節點上執行,不需要計算之前的rdd了
*/
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
// 如果已訪問過RDD,即以獲得RDD的TaskLocation則不需再次獲得
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
return Nil
}
// If the partition is cached, return the cache locations
//尋找當前rdd的partition是否快取了
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
//尋找當前rdd的partitions 是否checkpoint了
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
//最後,遞迴呼叫自己,去尋找rdd 的父rdd,看著對應的partition是否快取或者checkpoint
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
//如果這個stage,從最後一個rdd,到最開始的rdd,partition都沒有被快取或者checkpoint
//那麼,task的最佳位置preferredLocs,就是Nil
Nil
}
無論是通過哪種方式獲取RDD分割槽的優先位置,第一次計算的資料來源肯定都是通過RDD的preferredLocations方法獲取的,不同的RDD有不同的preferredLocations實現,但是資料無非就是在三個地方存在,被cache到記憶體、HDFS、磁碟,而這三種方式的TaskLocation都有具體的實現
/**
* A location that includes both a host and an executor id on that host.
*/
//資料在記憶體中
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
extends TaskLocation {
override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
}
/**
* A location on a host.
*/
//資料在磁碟上(非HDFS上)
private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = host
}
/**
* A location on a host that is cached by HDFS.
*/
//資料在HDFS上
private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation {
override def toString: String = TaskLocation.inMemoryLocationTag + host
}