【Spark Core】TaskScheduler原始碼與任務提交原理淺析2
引言
上一節《TaskScheduler原始碼與任務提交原理淺析1》介紹了TaskScheduler的建立過程,在這一節中,我將承接《Stage生成和Stage原始碼淺析》中的submitMissingTasks函式繼續介紹task的建立和分發工作。
DAGScheduler中的submitMissingTasks函式
如果一個Stage的所有的parent stage都已經計算完成或者存在於cache中,那麼他會呼叫submitMissingTasks來提交該Stage所包含的Tasks。
submitMissingTasks負責建立新的Task。
Spark將由Executor執行的Task分為ShuffleMapTask和ResultTask兩種。
每個Stage生成Task的時候根據Stage中的isShuffleMap標記確定是否為ShuffleMapStage,如果標記為真,則這個Stage輸出的結果會經過Shuffle階段作為下一個Stage的輸入,建立ShuffleMapTask;否則是ResultStage,這樣會建立ResultTask,Stage的結果會輸出到Spark空間;最後,Task是通過taskScheduler.submitTasks來提交的。
計算流程
submitMissingTasks的計算流程如下:
- 首先得到RDD中需要計算的partition,對於Shuffle型別的stage,需要判斷stage中是否快取了該結果;對於Result型別的Final Stage,則判斷計算Job中該partition是否已經計算完成。
- 序列化task的binary。Executor可以通過廣播變數得到它。每個task執行的時候首先會反序列化。這樣在不同的executor上執行的task是隔離的,不會相互影響。
- 為每個需要計算的partition生成一個task:對於Shuffle型別依賴的Stage,生成ShuffleMapTask型別的task;對於Result型別的Stage,生成一個ResultTask型別的task。
- 確保Task是可以被序列化的。因為不同的cluster有不同的taskScheduler,在這裡判斷可以簡化邏輯;保證TaskSet的task都是可以序列化的。
- 通過TaskScheduler提交TaskSet。
部分程式碼
下面是submitMissingTasks判斷是否為ShuffleMapStage的部分程式碼,其中部分引數說明在註釋中:
val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
partitionsToCompute.map { id =>
val locs = getPreferredLocs(stage.rdd, id)
val part = stage.rdd.partitions(id)
//stage.id:Stage的序號
//taskBinary:這個在下面具體介紹
//part:RDD對應的partition
//locs:最適合的執行位置
new ShuffleMapTask(stage.id, taskBinary, part, locs)
}
} else {
val job = stage.resultOfJob.get
partitionsToCompute.map { id =>
val p: Int = job.partitions(id)
val part = stage.rdd.partitions(p)
val locs = getPreferredLocs(stage.rdd, p)
//p:partition索引,表示從哪個partition讀取資料
//id:輸出的分割槽索引,表示reduceID
new ResultTask(stage.id, taskBinary, part, locs, id)
}
}
關於taskBinary引數:這是RDD和ShuffleDependency的廣播變數(broadcase version),作為序列化之後的結果。
這裡將RDD和其依賴關係進行序列化,在executor執行task之前再進行反序列化。這種方式對不同的task之間提供了較好的隔離。
下面是submitMissingTasks進行任務提交的部分程式碼:
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingTasks ++= tasks
logDebug("New pending tasks: " + stage.pendingTasks)
taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
}
TaskSchedulerImpl中的submitTasks
submitTasks的流程如下:
- 任務(tasks)會被包裝成TaskSetManager(由於TaskSetManager不是執行緒安全的,所以原始碼中需要進行同步)
- TaskSetManager例項通過schedulableBuilder(分為FIFOSchedulableBuilder和FairSchedulableBuilder兩種)投入排程池中等待排程
- 任務提交同時啟動定時器,如果任務還未被執行,定時器會持續發出警告直到任務被執行
- 呼叫backend的reviveOffers函式,向backend的driverActor例項傳送ReviveOffers訊息,driveerActor收到ReviveOffers訊息後,呼叫makeOffers處理函式
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
TaskSetManager排程
每個Stage一經確認,生成相應的TaskSet(即為一組tasks),其對應一個TaskSetManager通過Stage回溯到最源頭缺失的Stage提交到排程池pool中,在排程池中,這些TaskSetMananger又會根據Job ID排序,先提交的Job的TaskSetManager優先排程,然後一個Job內的TaskSetManager ID小的先排程,並且如果有未執行完的父母Stage的TaskSetManager,則不會提交到排程池中。
reviveOffers函式程式碼
下面是CoarseGrainedSchedulerBackend的reviveOffers函式:
override def reviveOffers() {
driverActor ! ReviveOffers
}
driveerActor收到ReviveOffers訊息後,呼叫makeOffers處理函式。
DriverActor的makeOffers函式
makeOffers函式的處理邏輯是:
- 找到空閒的Executor,分發的策略是隨機分發的,即儘可能將任務平攤到各個Executor
- 如果有空閒的Executor,就將任務列表中的部分任務利用launchTasks傳送給指定的Executor
SchedulerBackend(這裡實際是CoarseGrainedSchedulerBackend)負責將新建立的Task分發給Executor,從launchTasks程式碼中可以看出,在傳送LauchTasks指令之前需要將TaskDescription序列化。
// Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
}
TaskSchedulerImpl中的resourceOffers函式
任務是隨機分發給各個Executor的,資源分配的工作由resourceOffers函式處理。
正如上面submitTasks函式提到的,在TaskSchedulerImpl中,這一組Task被交給一個新的TaskSetManager例項進行管理,所有的TaskSetManager經由SchedulableBuilder根據特定的排程策略進行排序,在TaskSchedulerImpl的resourceOffers函式
中,當前被選擇的TaskSetManager的ResourceOffer函式被呼叫並返回包含了序列化任務資料的TaskDescription,最後這些TaskDescription再由SchedulerBackend派發到ExecutorBackend去執行。
resourceOffers主要做了3件事:
- 從Workers裡面隨機抽出一些來執行任務。
- 通過TaskSetManager找出和Worker在一起的Task,最後編譯打包成TaskDescription返回。
- 將Worker–>Array[TaskDescription]的對映關係返回。
/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
*/
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
// 遍歷worker提供的資源,更新executor相關的對映
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
activeExecutorIds += o.executorId
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// 從worker當中隨機選出一些來,防止任務都堆在一個機器上
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
// Build a list of tasks to assign to each worker.
// worker的task列表
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// getSortedTask函式對taskset進行排序
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
// 隨機遍歷抽出來的worker,通過TaskSetManager的resourceOffer,把本地性最高的Task分給Worker
// 本地性是根據當前的等待時間來確定的任務本地性的級別。
// 它的本地性主要是包括四類:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。
//1. 首先依次遍歷 sortedTaskSets, 並對於每個 Taskset, 遍歷 TaskLocality
//2. 越 local 越優先, 找不到(launchedTask 為 false)才會到下個 locality 級別
//3. (封裝在resourceOfferSingleTaskSet函式)在多次遍歷offer list,
//因為一次taskSet.resourceOffer只會佔用一個core,
//而不是一次用光所有的 core, 這樣有助於一個 taskset 中的 task 比較均勻的分佈在workers上
//4. 只有在該taskset, 該locality下, 對所有worker offer都找不到合適的task時,
//才跳到下個 locality 級別
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
TaskDescription程式碼:
private[spark] class TaskDescription(
val taskId: Long,
val attemptNumber: Int,
val executorId: String,
val name: String,
val index: Int, // Index within this task's TaskSet
_serializedTask: ByteBuffer)
extends Serializable {
// Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer
private val buffer = new SerializableBuffer(_serializedTask)
def serializedTask: ByteBuffer = buffer.value
override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index)
}
DriverActor的launchTasks函式
launchTasks函式流程:
- launchTasks函式將resourceOffers函式返回的TaskDescription資訊進行序列化
- 向executorActor傳送封裝了serializedTask的LaunchTask訊息
由於受到Akka Frame Size尺寸的限制,如果傳送資料過大,會被截斷。
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSet.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
}
}
}