1. 程式人生 > >TaskScheduler內幕天機:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO與FAIR、Task執行時本地性演算法詳解

TaskScheduler內幕天機:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO與FAIR、Task執行時本地性演算法詳解

  1. TaskSchedulerBackend與SchedulerBackend

  2. FIFO與FAIR兩種排程模式

  3. Task資料本地性資源的分配

一、TaskScheduler執行過程(Spark-shell角度)
1.啟動Spark-shell
當我們spark-shell本身的時候命令終端返回來的主要是ClientEndpoint和SparkDeploySchedulerBakcend。這是因為此時還沒有任何應用程式Job的觸發,這是啟動Application本身而已,所以主要就是例項化SparkContext並註冊當前的應用程式給Master,並從叢集中獲得ExecutorBackend的計算資源;(這就是為什麼啟動時日誌沒有DriverEndpoint資訊的原因,因為此時應用程式內部還未發生具體計算資源的排程)
2.TaskScheduler執行時機
DAGScheduler劃分好Stage後,會通過TaskSchedulerImpl中的TaskSetManager來管理當前要執行的Stage中的所有的任務TaskSet,TaskSetManager會根據locality aware來為Task奉陪計算資源,監控Task的執行狀態。(例如重試、慢任務以及進行推測式執行等)
二、TaskScheduler與SchedulerBackend
1.底層排程的總流程
(1)TaskScheduler提交Tasks
TaskScheduler.submitTasks方法主要作用是將TaskSet加入到TaskSetManager中進行管理。

//TaskScheduler裡面只是定義了submitTasks方法,具體實現是在TaskSchedulerImpl
override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
    //建立TaskSetManager,並設定最大失敗重試次數
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val
stage = taskSet.stageId //記錄Stage中提交的TaskSetManager val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager //如果重複提交同一個TaskSet或者Tasks不在當前的TaskSet中則會報錯 val conflictingTaskSet = stageTaskSets.exists { case
(_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } //新增TaskManager到排程佇列中,schedulableBuilder是應用程式級別的排程器 schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)<span style="white-space:pre"> </span>//1 //為慢任務啟動備份任務 if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } }<pre name="code" class="plain"> // default scheduler is FIFOprivate val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO") (TaskSchedulerImpl) }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true }//呼叫SparkDeploySchedulerBackend分配具體計算資源 backend.reviveOffers() //2 }

(2)新增TaskSetManager
SchedulerBuilder.addTaskSetManger(根據SchedulerMode的不同,FIFO與FAIR實現不同)方法會確定TaskSetManager的排程順序,然後按照TaskSetManager的locality aware來確定每個Task具體執行在那個ExecutorBackend中。

預設的排程順序為FIFO;Spark應用程式目前支援兩種排程模式FIFO和FAIR可以通過Spark-env.sh中的Spark.Scheduler.mode來進行具體的設定

def initialize(backend: SchedulerBackend) {
  this.backend = backend
  // temporarily set rootPool name to empty
  rootPool = new Pool("", schedulingMode, 0, 0)
  schedulableBuilder = {
    schedulingMode match {
      case SchedulingMode.FIFO =>
        new FIFOSchedulableBuilder(rootPool)
      case SchedulingMode.FAIR =>
        new FairSchedulableBuilder(rootPool, conf)
    }
  }

並且預設情況下是FIFO的方式:

// default scheduler is FIFO
private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")

schedulableBuilder是一個介面,裡面定義了addTaskSetManager方法。

private[spark] trait SchedulableBuilder {
  def rootPool: Pool

  def buildPools()

  def addTaskSetManager(manager: Schedulable, properties: Properties)
}

schedulableBuilder確定了TaskSetManager排程順序。
知道了schedulableBuilder是咋回事之後,那麼真正的呼叫就開始啦!
然後按照TaskSetManager的locality aware來確定每個Task具體執行在哪個ExecutorBackend中;
CoarseGrainedSchedulerBackend.reviveOffers:給DriverEndpoint傳送ReviveOffers。backend.reviveOffers()
而scheduleBackend只是定義了reviveOffers方法。def reviveOffers(): Unit
reviveOffers方法的具體實現是在:在CoarseGrainedSchedulerBackend實現,給DriverEndpoint傳送ReviveOffers訊息。

override def reviveOffers() {
  driverEndpoint.send(ReviveOffers)
}

ReviveOffers本身是一個空的case object物件,只是起到觸發底層資源排程的作用,在有Task提交或者計算資源變動的時候會發送ReviveOffers這個訊息作為觸發器。

// Internal messages in driver
case object ReviveOffers extends CoarseGrainedClusterMessage

此時DriverEndpoint收到ReviveOffers後,路由到makeOffers中。

case ReviveOffers =>
  makeOffers()

首先會準備好所有可以用於計算的workOffers(代表了所有可用ExecutorBackend中可以使用的Cores等資訊),因為之前的資源已經分配好了,現在只需要關係有哪些cores可以用於Task計算。

// Make fake resource offers on all executors
private def makeOffers() {
  // Filter out executors under killing
  val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
//產生集合,裡面包含executor的ID,freeCores
  val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
  }.toSeq
  launchTasks(scheduler.resourceOffers(workOffers))
}

將可用的計算資源準備好後,下面就可以為每個Task分配計算資源了
TaskSchedulerImpl.resourceOffers為每一個Task具體分配計算資源。輸入是workOffers代表可用的資源,實質上是ExecutorBackend的列表。

launchTasks(scheduler.resourceOffers(workOffers))

輸出值是:TaskDescription的二維陣列

// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

TaskDescription原始碼:
被TaskSetManager.resourceOffer建立的。而TaskDescription是用來描述哪些要傳送到executorbackend上計算的Task。也就是說TaskDescription此時描述的這個Task,是已經確定好了在哪個ExecutorBackend上執行。而確定Task具體執行在哪個ExecutorBackend上的演算法是由TaskSetManager的resourceOffers方法來定的。

/**
 * Description of a task that gets passed onto executors to be executed, usually created by
 * [[TaskSetManager.resourceOffer]].
 */
private[spark] class TaskDescription(
    val taskId: Long,
    val attemptNumber: Int,
    val executorId: String,
    val name: String,
    val index: Int,    // Index within this task's TaskSet
    _serializedTask: ByteBuffer)
  extends Serializable {

resourceOffers到底是如何確定Task具體執行在哪個ExecutorBackend上的呢?演算法的實現具體如下:
具體到resourceOffers檢視原始碼如下:
1. 通過Random.shuffle打散的是executorBackend的計算資源,防止Task集中分佈到某些機器上,為了負載均衡。

// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)

2.根據每個ExecutorBackend的cores的個數宣告型別為TaskDecription的ArrayBuffer陣列。

// Build a list of tasks to assign to each worker.
//為每個worker建立了一個ArrayBuffer例項,
//每個executor上能放多少個TaskDescription就可以執行多少個Task。
//tasks的陣列長度是由cores的多少決定的,cores也決定了worker上可以執行多少//個任務。
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// getSortedTaskSetQueue對TaskSetManager按照排程策略進行排序,將排序好的結//果賦值給sortedTaskSets
val sortedTaskSets = rootPool.getSortedTaskSetQueue

3.如果有新的ExecutorBackend分配給我們的Job,此時會呼叫executorAdd來獲取最新的完整的可用計算的計算資源,因為在執行中叢集中的資源可能會動態的改變的。

for (taskSet <- sortedTaskSets) {
  logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    taskSet.parent.name, taskSet.name, taskSet.runningTasks))
  if (newExecAvail) { //如果有可用的新的executor
    taskSet.executorAdded()
  }

4.下面的增強for迴圈執行是這樣的,每取出一個taskSet,maxLocality就會依次從PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY遍歷。從優先順序高到低來遍歷。追求最高級別的優先順序本地性。maxLocality會傳入resourceOfferSingleTaskSet.

 // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
  // of locality levels so that it gets a chance to launch local tasks on all of them.
  // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
  var launchedTask = false
  for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
    do {
      launchedTask = resourceOfferSingleTaskSet(
          taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
    } while (launchedTask)
  }

  if (tasks.size > 0) {
    hasLaunchedTask = true
  }
  return tasks
}

下面具體看一下resourceOfferSingleTaskSet原始碼

5. 此時的maxLocality就傳入到了resourceOffer,通過呼叫TastSetManager的resourceOffer來確定Task應該執行在哪個ExecutorBackend的具體的Locality Level;

for (i <- 0 until shuffledOffers.size) {//迴圈遍歷當前存在的executor
  val execId = shuffledOffers(i).executorId //獲取executor的ID
  val host = shuffledOffers(i).host //executor的host名字
  if (availableCpus(i) >= CPUS_PER_TASK) {  //每臺機器可用的計算資源
    try {
      for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
        tasks(i) += task
        val tid = task.taskId
        taskIdToTaskSetManager(tid) = taskSet
        taskIdToExecutorId(tid) = execId
        executorIdToTaskCount(execId) += 1
        executorsByHost(host) += execId
        availableCpus(i) -= CPUS_PER_TASK
        assert(availableCpus(i) >= 0)
        launchedTask = true
      }

6.確定好Task具體在哪個ExecutorBackend執行之後,通過luanchTasks把任務傳送給ExecutorBackend去執行。

launchTasks(scheduler.resourceOffers(workOffers))

補講:
1.Task預設的最大重試次數是4次:

def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4))

2.Spark應用程式目前支援兩種排程器:FIFO、FAIR,可以通過spark-env.sh中spark.scheduler.mode進行具體的設定,預設情況下是FIFO的方式:

private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
val schedulingMode: SchedulingMode = try {
  SchedulingMode.withName(schedulingModeConf.toUpperCase)

3.TaskScheduler中要負責為Task分配計算資源:此時程式已經具備叢集中的計算資源了,根據計算本地性原則確定Task具體要執行在哪個ExecutorBackend中;
4.資料本地優先順序從高到底以此為:優先順序高低排: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY,其中NO_PREF是指機器本地性
5.每個Task預設分配的core數為1

// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)

6.TaskSet類詳解TaskSet包含了一系列高層排程器交給底層排程器的任務的集合。

/**
 * A set of tasks submitted together to the low-level TaskScheduler, usually representing
 * missing partitions of a particular stage.
 */
private[spark] class TaskSet(
    val tasks: Array[Task[_]],//任意型別的Task
    val stageId: Int,   //Task屬於哪個Stage
    val stageAttemptId: Int, //嘗試的Id
    val priority: Int,  //優先順序
    val properties: Properties) {
  val id: String = stageId + "." + stageAttemptId

  override def toString: String = "TaskSet " + id
}

排程的時候,底層是有一個pool排程池,這個排程池會規定Stage提交之後具體執行的優先順序。
TaskSetManager(TaskSet的管理者)
例項化的時候要完成TaskSchedulerImpl工作的。

private[spark] class TaskSetManager(
    sched: TaskSchedulerImpl,
    val taskSet: TaskSet, //接收提交的任務的集合
    val maxTaskFailures: Int,//最大失敗提交次數
    clock: Clock = new SystemClock())
  extends Schedulable with Logging {

  val conf = sched.sc.conf

7.DAGScheduler是從資料層面考慮preferedLocation的,確定資料在哪,而TaskScheduler是從具體計算Task角度考慮計算的本地性,在哪計算,優先考慮在記憶體中。

8.Task進行廣播時候的AKKAFrameSize大小為128MB,如果任務大於128MB-200K的時候,則Task會直接被丟棄掉。

/** Returns the configured max frame size for Akka messages in bytes. */
def maxFrameSizeBytes(conf: SparkConf): Int = {
  val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)

如果小於128 MB-200K的話會通過CoarseGrainedSchedulerBackend去luanch到具體的ExecutorBackend上。executorEndpoint就會把當前的Task傳送到要執行的executorBackend上。通過LaunchTask實現。

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))