1. 程式人生 > >Spark TaskScheduler 功能及原始碼解析

Spark TaskScheduler 功能及原始碼解析

TaskScheduler是抽象類,目前Spark僅提供了TaskSchedulerImpl一種實現;其初始化是在SparkContext

private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean = false)
  extends TaskScheduler with Logging

TaskScheduler實際是SchedulerBackend的代理,本身處理一些通用邏輯,如不同Job間的排程順序,將執行緩慢的task在空閒節點上重新提交(speculation

)等

// SparkContext呼叫TaskSchedulerImpl.initialize方法,傳入SchedulerBackend物件
def initialize(backend: SchedulerBackend) {
  this.backend = backend
  // temporarily set rootPool name to empty
  rootPool = new Pool("", schedulingMode, 0, 0)
  schedulableBuilder = {
    schedulingMode match {
      case SchedulingMode.FIFO =>
        new
FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) } } schedulableBuilder.buildPools() }

Pool用於排程TaskManager,實際上PoolTaskManager都繼承了Schedulable特徵,因此Pool可以包含TaskManager或其他Pool

Spark預設使用FIFO(First In,First Out)排程模式,另外還有FAIR

模式。FIFO模式只有一個poolFAIR模式有多個poolPool也分FIFOFAIR兩種模式,兩種模式分別對應於FairSchedulableBuilderFIFOSchedulableBuilder

SparkContext根據master引數決定採用何種SchedulerBackend,以Spark Standalone模式為例,使用的是SparkDeploySchedulerBackend,繼承CoarseGrainedSchedulerBackend父類

private[spark] class SparkDeploySchedulerBackend(
    scheduler: TaskSchedulerImpl,
    sc: SparkContext,
    masters: Array[String])
  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
  with AppClientListener
  with Logging {

// SparkContext呼叫TaskSchedulerImpl.start方法
override def start() {
  backend.start()

  // 判斷speculation是否開啟,如是,則啟動執行緒將執行緩慢的任務在空閒的資源上重新提交
  if (!isLocal && conf.getBoolean("spark.speculation", false)) {
    logInfo("Starting speculative execution thread")
    sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds, SPECULATION_INTERVAL_MS milliseconds) {
      Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
    }(sc.env.actorSystem.dispatcher)
  }
}

SparkDeploySchedulerBackend.start方法中初始化了AppClient物件,主要用於DriverMasterAkka通訊互動資訊、註冊Spark Application

// SparkDeploySchedulerBackend.start()
override def start() {
  super.start()
  ...
  val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
  val javaOpts = sparkJavaOpts ++ extraJavaOpts

  // 將CoarseGrainedExecutorBackend封裝入Command
  val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
  val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
  val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
  val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)

  // 初始化AppClient物件
  client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
  client.start()
  waitForRegistration()
}

接下來進入正題:Task的排程執行

在上一篇文章《Spark DAGScheduler 功能及原始碼解析》的最後,DAGScheduler呼叫TaskSchedulerImpl.submitTasks方法提交TaskSet執行

// TaskSchedulerImpl.submitTasks
override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  // TaskSetManager不是執行緒安全的類,因此對其操作的時候需要保證synchronized
  this.synchronized {
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    activeTaskSets(taskSet.id) = manager
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal && !hasReceivedTask) {
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run() {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient resources")
          } else {
            this.cancel()
          }
        }
      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
    }
    hasReceivedTask = true
  }
  backend.reviveOffers()
}

SparkDeploySchedulerBackend.reviveOffers呼叫的是父類的CoarseGrainedSchedulerBackend.reviveOffers方法,再經過訊息傳遞,呼叫CoarseGrainedSchedulerBackend.makeOffers,再到CoarseGrainedSchedulerBackend.launchTasks,引數是TaskSchedulerImpl.resourceOffers方法所返回的

CoarseGrainedSchedulerBackend對於executor是粗粒度的管理,指的是在Job的整個生命週期中都會持有executor資源,而不是task結束就釋放executor,當新的task到來時再重新申請。粗粒度的好處是對於資源的計算相對簡單,缺點是會存在executor資源的浪費。相對的細粒度管理就存在executor的重用和搶佔,以提高利用率,目前僅有Mesos提供了細粒度管理。

當有資源更新時,比如新的executor加入(增加總core),已有的executor被移除(減少總core),executor當前任務完成(回收core),TaskScheduler會通知CoarseGrainedSchedulerBackend,後者就通過makeOffers方法呼叫TaskScheduler.resourceOffers方法,對等待佇列中的任務進行一次分配(其中系統資源以WorkerOffer的形式展現)

// CoarseGrainedSchedulerBackend.makeOffers
def makeOffers(executorId: String) {
  val executorData = executorDataMap(executorId)
  launchTasks(scheduler.resourceOffers(
    Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
}
// TaskSchedulerImpl.resourceOffers方法被cluster manager呼叫,傳遞的引數offers表示worker提供的資源,該方法根據資源情況,結合待執行任務的優先順序,將任務平衡的分配給executors
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
  // 啟用所有slave節點,記錄其hostname,並檢查是否有新的executor加入
  var newExecAvail = false
  for (o <- offers) {
    executorIdToHost(o.executorId) = o.host
    activeExecutorIds += o.executorId
    if (!executorsByHost.contains(o.host)) {
      executorsByHost(o.host) = new HashSet[String]()
      executorAdded(o.executorId, o.host)
      newExecAvail = true
    }
    for (rack <- getRackForHost(o.host)) {
      hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
    }
  }

  // 將workers的順序隨機洗牌,以避免總是前幾個worker被分配到任務
  val shuffledOffers = Random.shuffle(offers)
  // 構建task序列,以分配到worker
  val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
  val availableCpus = shuffledOffers.map(o => o.cores).toArray
  // 按優先順序排序的TaskSetManager序列,任務優先順序是由Pool的排程模式(FIFO/FAIR)決定的
  val sortedTaskSets = rootPool.getSortedTaskSetQueue
  for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
      taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
      taskSet.executorAdded()
    }
  }

  // 按照排程優先順序順序遍歷TaskSet,在所有系統資源(WorkerOffer)上從最高Locality到最低Locality依次嘗試執行最適合的task
  var launchedTask = false
  for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
    do {
      launchedTask = resourceOfferSingleTaskSet(
          taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
    } while (launchedTask)
  }

  if (tasks.size > 0) {
    hasLaunchedTask = true
  }
  return tasks
}


// TaskSchedulerImpl.resourceOfferSingleTaskSet
private def resourceOfferSingleTaskSet(
    taskSet: TaskSetManager,
    maxLocality: TaskLocality,
    shuffledOffers: Seq[WorkerOffer],
    availableCpus: Array[Int],
    tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
  var launchedTask = false
  for (i <- 0 until shuffledOffers.size) {
    val execId = shuffledOffers(i).executorId
    val host = shuffledOffers(i).host
    // 判斷executor是否有足夠的CPU核數來執行task
    if (availableCpus(i) >= CPUS_PER_TASK) {
      try {
        // 真正呼叫的是TaskSetManager.resourceOffer方法
        for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
          tasks(i) += task
          val tid = task.taskId
          taskIdToTaskSetId(tid) = taskSet.taskSet.id
          taskIdToExecutorId(tid) = execId
          executorsByHost(host) += execId
          availableCpus(i) -= CPUS_PER_TASK
          assert(availableCpus(i) >= 0)
          launchedTask = true
        }
      } catch {
        case e: TaskNotSerializableException =>
          logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
          // Do not offer resources for this task, but don't throw an error to allow other
          // task sets to be submitted.
          return launchedTask
      }
    }
  }
  return launchedTask
}

TaskSetManager.resourceOffer方法的作用是為executor資源提供一個最符合資料本地性的任務,其中涉及到Locality相關的邏輯

TaskLocality是列舉類,表示資料本地化的級別,其優先順序為
PROCESS_LOCAL(最高) < NODE_LOCAL < NO_PREF < RACK_LOCAL < ANY(最低);其中PROCESS_LOCAL,NODE_LOCAL,RACK_LOCAL可分別設定對應的延遲時間,預設值是3s

TaskSetManager內部維護了以下幾個HashMap

  • pendingTasksForExecutor
  • pendingTasksForHost
  • pendingTasksForRack
  • pendingTasksWithNoPrefs

TaskSetManager在初始化時,若Task的preferredLocations不為空,則將Task新增到前三個pending佇列;若為空,則加入pendingTasksWithNoPrefs

// TaskSetManager.resourceOffer
def resourceOffer(
    execId: String,
    host: String,
    maxLocality: TaskLocality.TaskLocality)
  : Option[TaskDescription] =
{
  if (!isZombie) {
    val curTime = clock.getTimeMillis()

    var allowedLocality = maxLocality

    if (maxLocality != TaskLocality.NO_PREF) {
      // 結合各Locality設定的延遲時間及上次成功在當前Locality級別提交任務的時間,獲得能夠允許的最高本地化級別的Locality級別
      allowedLocality = getAllowedLocalityLevel(curTime)
      // 大於表示本地化級別更低
      if (allowedLocality > maxLocality) {
        // 
        allowedLocality = maxLocality
      }
    }

    // dequeueTask返回的是允許的Locality範圍內Locality級別最高的Task的TaskDescription
    dequeueTask(execId, host, allowedLocality) match {
      case Some((index, taskLocality, speculative)) => {
        val task = tasks(index)
        val taskId = sched.newTaskId()
        // Do various bookkeeping
        copiesRunning(index) += 1
        val attemptNum = taskAttempts(index).size
        val info = new TaskInfo(taskId, index, attemptNum, curTime,
          execId, host, taskLocality, speculative)
        taskInfos(taskId) = info
        taskAttempts(index) = info :: taskAttempts(index)
        // 除非Task的Locality級別為NO_PREF,否則更新當前Locality級別為該task的Locality,並更新lastLaunchTime
        if (maxLocality != TaskLocality.NO_PREF) {
          currentLocalityIndex = getLocalityIndex(taskLocality)
          lastLaunchTime = curTime
        }
        // 序列化task
        val startTime = clock.getTimeMillis()
        val serializedTask: ByteBuffer = try {
          Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
        } catch {
          // 序列化出錯沒有重試的必要
          case NonFatal(e) =>
            val msg = s"Failed to serialize task $taskId, not attempting to retry it."
            logError(msg, e)
            abort(s"$msg Exception during serialization: $e")
            throw new TaskNotSerializableException(e)
        }
        // 若task過大,則存在優化的必要
        if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
            !emittedTaskSizeWarning) {
          emittedTaskSizeWarning = true
          logWarning(s"Stage ${task.stageId} contains a task of very large size " +
            s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
            s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
        }
        addRunningTask(taskId)

        // We used to log the time it takes to serialize the task, but task size is already
        // a good proxy to task serialization time.
        // val timeTaken = clock.getTime() - startTime
        val taskName = s"task ${info.id} in stage ${taskSet.id}"
        logInfo("Starting %s (TID %d, %s, %s, %d bytes)".format(
            taskName, taskId, host, taskLocality, serializedTask.limit))

        // 通知DAGScheduler任務開始執行
        sched.dagScheduler.taskStarted(task, info)
        return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
          taskName, index, serializedTask))
      }
      case _ =>
    }
  }
  None
}

TaskSetManager.getAllowedLocalityLevel結合各Locality設定的延遲時間及上次成功在當前Locality級別提交任務的時間,獲得能夠允許的最高本地化級別的Locality級別

// TaskSetManager.getAllowedLocalityLevel
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
  // 移除已被排程或完成的task,採用的是lazy方式
  def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
    var indexOffset = pendingTaskIds.size
    while (indexOffset > 0) {
      indexOffset -= 1
      val index = pendingTaskIds(indexOffset)
      if (copiesRunning(index) == 0 && !successful(index)) {
        return true
      } else {
        pendingTaskIds.remove(indexOffset)
      }
    }
    false
  }
  // 遍歷pendingTasks,移除已被排程的task,若仍有task待排程,返回true
  def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
    val emptyKeys = new ArrayBuffer[String]
    val hasTasks = pendingTasks.exists {
      case (id: String, tasks: ArrayBuffer[Int]) =>
        if (tasksNeedToBeScheduledFrom(tasks)) {
          true
        } else {
          emptyKeys += id
          false
        }
    }
    // The key could be executorId, host or rackId
    emptyKeys.foreach(id => pendingTasks.remove(id))
    hasTasks
  }

  // currentLocalityIndex記錄了當前執行在哪個TaskLocality
  while (currentLocalityIndex < myLocalityLevels.length - 1) {
    val moreTasks = myLocalityLevels(currentLocalityIndex) match {
      case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
      case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
      case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
      case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
    }
    if (!moreTasks) {
      // 若當前Locality沒有需要執行的task,則進入更低一級Locality,並更新lastLaunchTime
      lastLaunchTime = curTime
      logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
        s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
      currentLocalityIndex += 1
    } else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
      // 若距離上次成功在此Locality級別提交任務的時間間隔超過了該Locality級別設定的延遲時間,則進入更低一級Locality,並更新lastLaunchTime
      lastLaunchTime += localityWaits(currentLocalityIndex)
      currentLocalityIndex += 1
      logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex)} after waiting for " +
        s"${localityWaits(currentLocalityIndex)}ms")
    } else {
      return myLocalityLevels(currentLocalityIndex)
    }
  }
  myLocalityLevels(currentLocalityIndex)
}

取得最適合執行的Task後,呼叫ScheduledBackend.launchTasks方法執行Task

// CoarseGrainedSchedulerBackend.launchTasks
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val serializedTask = ser.serialize(task)
    // 當task序列化的大小超過AkkaFrameSize的限制時,撤銷TaskSet,並丟擲提示資訊
    if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
      val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
      scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
            "spark.akka.frameSize or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
            AkkaUtils.reservedSizeBytes)
          taskSet.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      val executorData = executorDataMap(task.executorId)
      executorData.freeCores -= scheduler.CPUS_PER_TASK
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

通過訊息傳遞,被呼叫的是Executor.launchTask

// Executor.launchTask
def launchTask(
    context: ExecutorBackend,
    taskId: Long,
    attemptNumber: Int,
    taskName: String,
    serializedTask: ByteBuffer): Unit = {
  val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask)
  runningTasks.put(taskId, tr)
  threadPool.execute(tr)
}

TaskRunner是真正執行任務的類,負責反序列化TaskRDD,執行Task並統計執行的時間;它也定義在Executor.scala檔案裡