Spark TaskScheduler 功能及原始碼解析
TaskScheduler
是抽象類,目前Spark僅提供了TaskSchedulerImpl
一種實現;其初始化是在SparkContext
中
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging
TaskScheduler
實際是SchedulerBackend
的代理,本身處理一些通用邏輯,如不同Job間的排程順序,將執行緩慢的task在空閒節點上重新提交(speculation
// SparkContext呼叫TaskSchedulerImpl.initialize方法,傳入SchedulerBackend物件
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
Pool
用於排程TaskManager
,實際上Pool
和TaskManager
都繼承了Schedulable
特徵,因此Pool
可以包含TaskManager
或其他Pool
;
Spark
預設使用FIFO(First In,First Out)
排程模式,另外還有FAIR
FIFO
模式只有一個pool
,FAIR
模式有多個pool
。Pool
也分FIFO
和FAIR
兩種模式,兩種模式分別對應於FairSchedulableBuilder
和FIFOSchedulableBuilder
SparkContext
根據master
引數決定採用何種SchedulerBackend
,以Spark Standalone
模式為例,使用的是SparkDeploySchedulerBackend
,繼承CoarseGrainedSchedulerBackend
父類
private[spark] class SparkDeploySchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
with AppClientListener
with Logging {
// SparkContext呼叫TaskSchedulerImpl.start方法
override def start() {
backend.start()
// 判斷speculation是否開啟,如是,則啟動執行緒將執行緩慢的任務在空閒的資源上重新提交
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds, SPECULATION_INTERVAL_MS milliseconds) {
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
}(sc.env.actorSystem.dispatcher)
}
}
SparkDeploySchedulerBackend.start
方法中初始化了AppClient
物件,主要用於Driver
和Master
的Akka
通訊互動資訊、註冊Spark Application
等
// SparkDeploySchedulerBackend.start()
override def start() {
super.start()
...
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
// 將CoarseGrainedExecutorBackend封裝入Command
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
// 初始化AppClient物件
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
waitForRegistration()
}
接下來進入正題:Task的排程執行
在上一篇文章《Spark DAGScheduler 功能及原始碼解析》的最後,DAGScheduler
呼叫TaskSchedulerImpl.submitTasks
方法提交TaskSet
執行
// TaskSchedulerImpl.submitTasks
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
// TaskSetManager不是執行緒安全的類,因此對其操作的時候需要保證synchronized
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
SparkDeploySchedulerBackend.reviveOffers
呼叫的是父類的CoarseGrainedSchedulerBackend.reviveOffers
方法,再經過訊息傳遞,呼叫CoarseGrainedSchedulerBackend.makeOffers
,再到CoarseGrainedSchedulerBackend.launchTasks
,引數是TaskSchedulerImpl.resourceOffers
方法所返回的
CoarseGrainedSchedulerBackend
對於executor是粗粒度的管理,指的是在Job的整個生命週期中都會持有executor資源,而不是task結束就釋放executor,當新的task到來時再重新申請。粗粒度的好處是對於資源的計算相對簡單,缺點是會存在executor資源的浪費。相對的細粒度管理就存在executor的重用和搶佔,以提高利用率,目前僅有Mesos
提供了細粒度管理。
當有資源更新時,比如新的executor加入(增加總core),已有的executor被移除(減少總core),executor當前任務完成(回收core),TaskScheduler
會通知CoarseGrainedSchedulerBackend
,後者就通過makeOffers
方法呼叫TaskScheduler.resourceOffers
方法,對等待佇列中的任務進行一次分配(其中系統資源以WorkerOffer
的形式展現)
// CoarseGrainedSchedulerBackend.makeOffers
def makeOffers(executorId: String) {
val executorData = executorDataMap(executorId)
launchTasks(scheduler.resourceOffers(
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
}
// TaskSchedulerImpl.resourceOffers方法被cluster manager呼叫,傳遞的引數offers表示worker提供的資源,該方法根據資源情況,結合待執行任務的優先順序,將任務平衡的分配給executors
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// 啟用所有slave節點,記錄其hostname,並檢查是否有新的executor加入
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
activeExecutorIds += o.executorId
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// 將workers的順序隨機洗牌,以避免總是前幾個worker被分配到任務
val shuffledOffers = Random.shuffle(offers)
// 構建task序列,以分配到worker
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// 按優先順序排序的TaskSetManager序列,任務優先順序是由Pool的排程模式(FIFO/FAIR)決定的
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// 按照排程優先順序順序遍歷TaskSet,在所有系統資源(WorkerOffer)上從最高Locality到最低Locality依次嘗試執行最適合的task
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
// TaskSchedulerImpl.resourceOfferSingleTaskSet
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
// 判斷executor是否有足夠的CPU核數來執行task
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
// 真正呼叫的是TaskSetManager.resourceOffer方法
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
TaskSetManager.resourceOffer
方法的作用是為executor資源提供一個最符合資料本地性的任務,其中涉及到Locality
相關的邏輯
TaskLocality
是列舉類,表示資料本地化的級別,其優先順序為
PROCESS_LOCAL(最高) < NODE_LOCAL < NO_PREF < RACK_LOCAL < ANY(最低)
;其中PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL
可分別設定對應的延遲時間,預設值是3s
TaskSetManager
內部維護了以下幾個HashMap
- pendingTasksForExecutor
- pendingTasksForHost
- pendingTasksForRack
- pendingTasksWithNoPrefs
TaskSetManager
在初始化時,若Task的preferredLocations
不為空,則將Task新增到前三個pending佇列;若為空,則加入pendingTasksWithNoPrefs
// TaskSetManager.resourceOffer
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
// 結合各Locality設定的延遲時間及上次成功在當前Locality級別提交任務的時間,獲得能夠允許的最高本地化級別的Locality級別
allowedLocality = getAllowedLocalityLevel(curTime)
// 大於表示本地化級別更低
if (allowedLocality > maxLocality) {
//
allowedLocality = maxLocality
}
}
// dequeueTask返回的是允許的Locality範圍內Locality級別最高的Task的TaskDescription
dequeueTask(execId, host, allowedLocality) match {
case Some((index, taskLocality, speculative)) => {
val task = tasks(index)
val taskId = sched.newTaskId()
// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// 除非Task的Locality級別為NO_PREF,否則更新當前Locality級別為該task的Locality,並更新lastLaunchTime
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// 序列化task
val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
// 序列化出錯沒有重試的必要
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
// 若task過大,則存在優化的必要
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
addRunningTask(taskId)
// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
taskName, taskId, host, taskLocality, serializedTask.limit))
// 通知DAGScheduler任務開始執行
sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask))
}
case _ =>
}
}
None
}
TaskSetManager.getAllowedLocalityLevel
結合各Locality
設定的延遲時間及上次成功在當前Locality
級別提交任務的時間,獲得能夠允許的最高本地化級別的Locality
級別
// TaskSetManager.getAllowedLocalityLevel
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// 移除已被排程或完成的task,採用的是lazy方式
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
// 遍歷pendingTasks,移除已被排程的task,若仍有task待排程,返回true
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// The key could be executorId, host or rackId
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
// currentLocalityIndex記錄了當前執行在哪個TaskLocality
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// 若當前Locality沒有需要執行的task,則進入更低一級Locality,並更新lastLaunchTime
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// 若距離上次成功在此Locality級別提交任務的時間間隔超過了該Locality級別設定的延遲時間,則進入更低一級Locality,並更新lastLaunchTime
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
取得最適合執行的Task後,呼叫ScheduledBackend.launchTasks
方法執行Task
// CoarseGrainedSchedulerBackend.launchTasks
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
// 當task序列化的大小超過AkkaFrameSize的限制時,撤銷TaskSet,並丟擲提示資訊
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSet.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
通過訊息傳遞,被呼叫的是Executor.launchTask
// Executor.launchTask
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
TaskRunner
是真正執行任務的類,負責反序列化Task
,RDD
,執行Task並統計執行的時間;它也定義在Executor.scala
檔案裡