1. 程式人生 > >spark任務分配----TaskSchedulerImpl原始碼解析

spark任務分配----TaskSchedulerImpl原始碼解析

TaskSchedulerImpl

上一篇講到DAGScheduler根據shuffle依賴對作業的整個計算鏈劃分成多個stage之後,就開始提交最後一個ResultStage,而由於stage之間的依賴關係,實際上最終是循著計算鏈從上到下依次提交stage的。每提交一個stage,就會將這個stage分成多個Task,並且會計算每個Task的偏向位置,將RDD和ShuffleDependency,TaskMetrics等物件序列化用於遠端傳輸,最後把一個stage的所有Task包裝成一個任務集,提交給TaskSchedulerImpl執行。本節就來分析一下這個TaskSchedulerImpl。首先把TaskSchedulerImpl的說明翻譯一下:

TaskSchedulerImpl的主要作用是排程Task,內部通過排程後端進行實際任務的傳輸。不同的叢集型別對應不同的具體的排程後端的實現,例如本地模式的排程後端實現是LocalSchedulerBackend,而任務排程器的實現只有一種就是TaskSchedulerImpl。TaskSchedulerImpl主要處理一些通用的邏輯,例如在多個作業之間決定排程順序,執行推測執行的邏輯等等。

TaskSchedulerImpl在使用之前應該先呼叫initialize() 和 start()方法,然後再提交任務集。

這裡插一句,這兩個方法分別在上面地方呼叫呢?都是在SparkContext初始化的時候呼叫的,具體可以看SparkContext初始化程式碼。

關於執行緒安全的一些提示:由於會存在多執行緒同時提交任務的情況,所以面向外部的public的方法必須加鎖以保持內部狀態量,簿記量的一致性。
此外,一些排程後端(SchedulerBackend)的方法會先獲取自身的鎖,然後獲取TaskSchedulerImpl物件的鎖,所有應該避免在持有TaskSchedulerImpl物件的鎖的情況下再嘗試獲取排程後端的鎖,這樣會造成死鎖。實際上這句話的意思就是因為有些操作需要同時持有排程後端的鎖和TaskSchedulerImpl鎖,對於這種需要同時持有多把鎖的情況,應該保持獲取鎖的順序是一致的,這樣就能避免出現死鎖的情況。
舉個例子:有些操作要同時獲取A鎖和B鎖,如果方法m1的獲取順序是先獲取A鎖然後獲取B鎖,而m2是先B後A,如果同時又兩個執行緒t1,t2分別執行方法m1和m2,有可能在某個時刻形成這樣的情況:t1獲取了A在等待B,而t2獲取了B在等待A,這樣互相等待而又不釋放鎖就形成死鎖。

好了,接下來我們接著任務提交的邏輯繼續分析。

submitTasks

override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
// 加鎖,更新一些簿記量和狀態量
this.synchronized {
  // 建立一個TaskSetManager,對Task集的進一步封裝
  val manager = createTaskSetManager(taskSet, maxTaskFailures)
  val stage = taskSet.stageId
  // 更新stageId和TaskSetManager的對映關係,由於失敗重試機制,一個stage可能會被多次嘗試
  val stageTaskSets =
    taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
  // 將本次嘗試的TaskSetManager新增到對映中
  stageTaskSets(taskSet.stageAttemptId) = manager
  // 檢測是否有還在執行的stage嘗試,如果有就是重複提交了,需要丟擲一個異常
  val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
    ts.taskSet != taskSet && !ts.isZombie
  }
  if (conflictingTaskSet) {
    throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
      s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
  }
  // 將新建立的TaskSetManager新增到排程池中,排程池決定了如果存在多個TaskSet在排隊應該如何進行排序,
  // 可以通過taskSet.properties設定佇列名稱,taskSet.properties是通過SparkContext的一個ThreadLocal變數設定的
  schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

  // 啟動一個定時任務列印提示資訊
  if (!isLocal && !hasReceivedTask) {
    starvationTimer.scheduleAtFixedRate(new TimerTask() {
      override def run() {
        if (!hasLaunchedTask) {
          logWarning("Initial job has not accepted any resources; " +
            "check your cluster UI to ensure that workers are registered " +
            "and have sufficient resources")
        } else {
          this.cancel()
        }
      }
    }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
  }
  hasReceivedTask = true
}
// 到這裡,鎖已經釋放
// 通知排程後端有任務可以運行了,
backend.reviveOffers()
}
  • 獲取鎖,更新一些簿記量
  • 將新的任務集封裝為TaskSetManager新增到排程池中
  • 呼叫SchedulerBackEnd,給任務分配可用資源

CoarseGrainedSchedulerBackend.reviveOffers

override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}

這個方法通過rpc模組給DriverEndPoint傳送一個訊息,本地程序內呼叫rpc方法,主要是為了程式碼模組的統一。 在DriverEndpoint.receive方法中,我們可以看到,在接收到ReviveOffers訊息後,就會呼叫makeOffers方法,

DriverEndpoint.makeOffers

private def makeOffers() {
  // Make sure no executor is killed while some task is launching on it
  // 獲取鎖,同步
  val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
    // Filter out executors under killing
    // 過濾掉正在被殺死的executor
    val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
    // 把所有可用的executor封裝成資源物件
    val workOffers = activeExecutors.map {
      case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toIndexedSeq
    // 把這些可用的資源交給TaskSchedulerImpl進行排程
    // TaskSchedulerImpl會綜合考慮任務本地性,黑名單,排程池的排程順序等因素,返回TaskDescription集合
    // TaskDescription物件是對一個Task的完整描述,
    // 包括序列化的任務資料,任務在哪個executor上執行,依賴檔案和jar包等資訊
    scheduler.resourceOffers(workOffers)
  }
  if (!taskDescs.isEmpty) {
    launchTasks(taskDescs)
  }
}

這個方法主要是將當前所有可用的資源(executor)封裝成資源物件(WorkerOffer)交給TaskSchedulerImpl,TaskSchedulerImpl會綜合考慮任務本地性,黑名單,排程池的排程順序等因素,返回TaskDescription集合,TaskDescription物件是對一個Task的完整描述,包括序列化的任務資料,任務在哪個executor上執行,依賴檔案和jar包等資訊。

從這裡也可以看出,排程後端的職責其實相對比較少主要是對executor的管理,以及呼叫rpc遠端服務的引用傳送任務資料,大部分的排程工作還是由TaskSchedulerImpl來完成。
接下來我們分析一下Task排程最重要的一個方法,TaskSchedulerImpl.resourceOffers

TaskSchedulerImpl.resourceOffers

// 這個方法由排程後端呼叫,排程後端會將可用的executor資源告訴TaskSchedulerImpl,
// TaskSchedulerImpl根據TaskSet優先順序(排程池),黑名單,本地性等因素給出要實際執行的任務。
// 我們使用round-robin的方式將任務分配到各個executor上,以使得計算資源的 使用更均衡。
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
// 標記是否有新的可用executor加入
var newExecAvail = false
// 這個迴圈主要目的是兩個:
// 1. 更新一些簿記量,如物理節點和executor的相互對映關係,機架和host的對映關係,host和executor上執行的任務資訊等等
// 2. 檢查是否有新的可用executor加入
for (o <- offers) {
  if (!hostToExecutors.contains(o.host)) {
    hostToExecutors(o.host) = new HashSet[String]()
  }
  if (!executorIdToRunningTaskIds.contains(o.executorId)) {
    hostToExecutors(o.host) += o.executorId
    executorAdded(o.executorId, o.host)
    executorIdToHost(o.executorId) = o.host
    executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
    newExecAvail = true
  }
  for (rack <- getRackForHost(o.host)) {
    hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
  }
}

// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
// this here to avoid a separate thread and added synchronization overhead, and also because
// updating the blacklist is only relevant when task offers are being made.
// 觸發黑名單的超時檢查,被加入黑明單的節點或executor是由一定超時時間的,
// 在超時時間內不能像他們提交任務,而過了超時時間,這些資源將被重新投入使用
blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())

// 根據最新的黑名單過濾掉在黑名單中的計算資源,包括host和executor
val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
  offers.filter { offer =>
    !blacklistTracker.isNodeBlacklisted(offer.host) &&
      !blacklistTracker.isExecutorBlacklisted(offer.executorId)
  }
}.getOrElse(offers)

// 對資源進行混洗,使得分配更加均勻,使用scala庫的Random進行混洗
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
// 每個executor能分配多少個任務,cores / CPUS_PER_TASK
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
// 每個executor提供的cpu核數
val availableCpus = shuffledOffers.map(o => o.cores).toArray
// 通過排程池對所有的任務集按優先順序進行排序,獲取排序後的任務集
val sortedTaskSets = rootPool.getSortedTaskSetQueue
// 如果有新的executor加入,需要通知每個TaskSetManager
for (taskSet <- sortedTaskSets) {
  logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    taskSet.parent.name, taskSet.name, taskSet.runningTasks))
  if (newExecAvail) {
    taskSet.executorAdded()
  }
}

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
// 對於每一個任務集,為其分配資源
for (taskSet <- sortedTaskSets) {
  var launchedAnyTask = false
  var launchedTaskAtCurrentMaxLocality = false
  // 本地性從低到高的順序
  for (currentMaxLocality <- taskSet.myLocalityLevels) {
    // 每個本地性級別會進行多輪分配,
    // 每一輪依次輪詢每個executor,每個executor分配一個任務,
    // 這樣一輪下來每個executor都會分配到一個任務,顯然大多數情況下,executor的資源是不會被佔滿的
    // 沒關係,我們會接著進行第二輪分配,知道沒有資源或者在當前的本地性級別下任務被分配完了,就跳出迴圈
    do {
      launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
        taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
      launchedAnyTask |= launchedTaskAtCurrentMaxLocality
    } while (launchedTaskAtCurrentMaxLocality)
  }
  if (!launchedAnyTask) {
    taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
  }
}

if (tasks.size > 0) {
  hasLaunchedTask = true
}
return tasks
}
  • 更新一些簿記量,如物理節點和executor的相互對映關係,機架和host的對映關係,host和executor上執行的任務資訊等等; 檢查是否有新的可用executor加入
  • 觸發黑名單的超時檢查,被加入黑明單的節點或executor是由一定超時時間的,在超時時間內不能像他們提交任務,而過了超時時間,這些資源將被重新投入使用; 根據最新的黑名單過濾掉在黑名單中的計算資源,包括host和executor
  • 通過排程池對所有的任務集按優先順序進行排序,獲取排序後的任務集
  • 對於每一個任務集,按照對executor進行round-robin的方式分配任務,會進行多輪分配,每一輪依次輪詢所有的executor,為每一個executor分配一個符合本地性要求的任務

TaskSchedulerImpl.resourceOfferSingleTaskSet

private def resourceOfferSingleTaskSet(
  taskSet: TaskSetManager,
  maxLocality: TaskLocality,
  shuffledOffers: Seq[WorkerOffer],
  availableCpus: Array[Int],
  tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
// nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point
// 輪詢每一個executor,分別為其分配一個Task
for (i <- 0 until shuffledOffers.size) {
  val execId = shuffledOffers(i).executorId
  val host = shuffledOffers(i).host
  // 檢查這個executor上的cpu資源是否夠用
  if (availableCpus(i) >= CPUS_PER_TASK) {
    try {
      // 根據最大允許的本地性級別取出能夠在這個executor上執行的任務,
      for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
        // 如果能夠找出一個可以在這個executor上執行的符合本地性要求的任務,
        // 將這個任務加入傳進來的集合中
        tasks(i) += task
        val tid = task.taskId
        taskIdToTaskSetManager(tid) = taskSet
        taskIdToExecutorId(tid) = execId
        executorIdToRunningTaskIds(execId).add(tid)
        availableCpus(i) -= CPUS_PER_TASK
        assert(availableCpus(i) >= 0)
        launchedTask = true
      }
    } catch {
      case e: TaskNotSerializableException =>
        logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
        // Do not offer resources for this task, but don't throw an error to allow other
        // task sets to be submitted.
        return launchedTask
    }
  }
}
return launchedTask
}

這個方法就是對所有可用的executor進行一輪round-robin方式的分配,一輪分配中,每個executor最多隻能得到一個任務,這樣做是為了儘量將任務“打散”,均勻第“撒到”所有executor上。

TaskSetManager.resourceOffer

@throws[TaskNotSerializableException]
def resourceOffer(
  execId: String,
  host: String,
  maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
// 首先檢查黑名單
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
  blacklist.isNodeBlacklistedForTaskSet(host) ||
    blacklist.isExecutorBlacklistedForTaskSet(execId)
}

if (!isZombie && !offerBlacklisted) {
  val curTime = clock.getTimeMillis()

  var allowedLocality = maxLocality

  // 根據本地性等待時間重新計算本地性級別
  if (maxLocality != TaskLocality.NO_PREF) {
    allowedLocality = getAllowedLocalityLevel(curTime)
    if (allowedLocality > maxLocality) {
      // We're not allowed to search for farther-away tasks
      allowedLocality = maxLocality
    }
  }

  // 找出一個在指定的本地性級別下,能夠在這個executor上執行的任務
  dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
    // Found a task; do some bookkeeping and return a task description
    val task = tasks(index)
    // 分配一個taskId
    val taskId = sched.newTaskId()
    // Do various bookkeeping
    // 更新一些簿記量
    copiesRunning(index) += 1
    // task的嘗試次數
    val attemptNum = taskAttempts(index).size
    val info = new TaskInfo(taskId, index, attemptNum, curTime,
      execId, host, taskLocality, speculative)
    taskInfos(taskId) = info
    // 記錄每次嘗試的任務資訊
    taskAttempts(index) = info :: taskAttempts(index)
    // Update our locality level for delay scheduling
    // NO_PREF will not affect the variables related to delay scheduling
    // 更新本地性資訊和事件資訊用於計算本地性等待時間
    // 而對於沒有本地性偏好的任務則不會影響這些簿記量
    if (maxLocality != TaskLocality.NO_PREF) {
      currentLocalityIndex = getLocalityIndex(taskLocality)
      lastLaunchTime = curTime
    }
    // Serialize and return the task
    // 對task進行序列化
    val serializedTask: ByteBuffer = try {
      ser.serialize(task)
    } catch {
      // If the task cannot be serialized, then there's no point to re-attempt the task,
      // as it will always fail. So just abort the whole task-set.
      case NonFatal(e) =>
        val msg = s"Failed to serialize task $taskId, not attempting to retry it."
        logError(msg, e)
        abort(s"$msg Exception during serialization: $e")
        throw new TaskNotSerializableException(e)
    }
    // 如果序列化後的體積超過指定閾值,那麼會列印一條警告資訊
    if (serializedTask.limit() > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
      !emittedTaskSizeWarning) {
      emittedTaskSizeWarning = true
      logWarning(s"Stage ${task.stageId} contains a task of very large size " +
        s"(${serializedTask.limit() / 1024} KB). The maximum recommended task size is " +
        s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
    }
    // 更新排程池中的執行任務統計的簿記量
    addRunningTask(taskId)

    // We used to log the time it takes to serialize the task, but task size is already
    // a good proxy to task serialization time.
    // val timeTaken = clock.getTime() - startTime
    val taskName = s"task ${info.id} in stage ${taskSet.id}"
    logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
      s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit()} bytes)")

    // 通過dagScheduler給事件匯流排頭第一個任務開始的事件
    sched.dagScheduler.taskStarted(task, info)
    // 封裝成一個TaskDescription物件,並返回給上層呼叫
    new TaskDescription(
      taskId,
      attemptNum,
      execId,
      taskName,
      index,
      addedFiles,
      addedJars,
      task.localProperties,
      serializedTask)
  }
} else {
  None
}
}

這個方法的作用是對給定的executor和本地性級別,分配一個符合要求的任務給這個executor。 最終任務被封裝成TaskDescription物件。

小結

對任務分配做一個小結。在給定的計算資源上分配合適的任務,這個工作主要是由TaskScheduler和TaskSetManager兩個類協同完成的。而任務本地性的維護與分配時的檢查工作是在TaskSetManager中完成的。
接下來,我們分析一下獲取到可以實際執行的任務後,排程後端是怎麼把這些任務傳送到制定的executor上執行的。

DriverEndpoint.makeOffers

首先還得接著回到DriverEndpoint.makeOffers方法,makeOffers方法中通過呼叫TaskSchedulerImpl.resourceOffers方法切入TaskSchedulerImpl,然後就是TaskSchedulerImpl在做任務分配的工作,最終TaskSchedulerImpl將分配好的任務以TaskDescription的封裝形式返回給DriverEndpoint(DriverEndpoint是排程後端的一個內部類),然後緊接著呼叫DriverEndpoint.launchTasks方法將這些任務傳給相應的executor執行。

DriverEndpoint.launchTasks

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
  for (task <- tasks.flatten) {
    val serializedTask = TaskDescription.encode(task)
    if (serializedTask.limit() >= maxRpcMessageSize) {
      scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
        try {
          var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
            "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
            "spark.rpc.message.maxSize or using broadcast variables for large values."
          msg = msg.format(task.taskId, task.index, serializedTask.limit(), maxRpcMessageSize)
          // 注意,這裡只要有一個task體積超過閾值,就將該task所屬的taskSet取消掉
          // 為什麼這麼做呢?因為其實一個taskset中的所有task的體積都是一樣的,只是partition序號不同而已,
          // 所以一個task體積超過閾值,taskset中的其他task也必然超過閾值,
          // 所以沒有必要再嘗試其他的task, 直接把taskset取消掉更高效
          taskSetMgr.abort(msg)
        } catch {
          case e: Exception => logError("Exception in error callback", e)
        }
      }
    }
    else {
      val executorData = executorDataMap(task.executorId)
      // 維護cpu資源資訊
      executorData.freeCores -= scheduler.CPUS_PER_TASK

      logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
        s"${executorData.executorHost}.")

      // 通過rpc傳送任務到指定的executor上
      executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
  }
}

這個方法跟簡單,主要就是對TaskDescription進行序列化,然後檢查體積是否超過閾值,如果沒超過閾值就呼叫rpc服務引用,將任務傳送到指定的executor上。

總結

好了,經過漫長的呼叫,終於我們的任務要離開driver,駛向executor了,回顧一下任務在driver中從誕生到最終傳送的過程,主要有一下幾個步驟:

  • DAGScheduler對作業計算鏈按照shuffle依賴劃分多個stage,提交一個stage根據個stage的一些資訊建立多個Task,包括ShuffleMapTask和ResultTask, 並封裝成一個任務集(TaskSet),把這個任務集交給TaskScheduler
  • TaskSchedulerImpl將接收到的任務集加入排程池中,然後通知排程後端SchedulerBackend
  • CoarseGrainedSchedulerBackend收到新任務提交的通知後,檢查下現在可用 executor有哪些,並把這些可用的executor交給TaskSchedulerImpl
  • TaskSchedulerImpl根據獲取到的計算資源,根據任務本地性級別的要求以及考慮到黑名單因素,按照round-robin的方式對可用的executor進行輪詢分配任務,經過多個本地性級別分配,多輪分配後最終得出任務與executor之間的分配關係,並封裝成TaskDescription形式返回給SchedulerBackend
  • SchedulerBackend拿到這些分配關係後,就知道哪些任務該發往哪個executor了,通過呼叫rpc介面將任務通過網路傳送即可。