1. 程式人生 > >Spark資源調度

Spark資源調度

.sh 管理 方法 com -s serial boolean xtend tint

一:任務調度和資源調度的區別:

任務調度是指通過DAGScheduler,TaskScheduler,SchedulerBackend完成的job的調度

資源調度是指應用程序獲取資源的調度,他是通過schedule方法完成的

二:資源調度解密

因為master負責資源管理和調度,所以資源調度的方法schedule位於master.scala這個了類中,當註冊程序或者資源發生改變的都會導致schedule的調用,例如註冊程序的時候(包括worker,driver和application的註冊等,註意executor是向SparkDeploySchedulerBackend註冊的)

case RegisterApplication(description, driver) => {
  
// TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { // ignore, don‘t send response } else { logInfo("Registering app " + description.name) val app = createApplication(description, driver) registerApplication(app) logInfo("Registered app "
+ description.name + " with ID " + app.id) persistenceEngine.addApplication(app) driver.send(RegisteredApplication(app.id, self)) schedule() }
**
 * Schedule the currently available resources among waiting apps. This method will be called
 * every time a new app joins or resource availability changes.
 
*/

每當新的應用程序加入或者可用資源發生改變(比如exccutor或者worker增加或者減少的時候)的時候,該方法都會發生響應

private def schedule(): Unit = {
  if (state != RecoveryState.ALIVE) { return }//判斷Master的狀態是否為ALIVE,如果不是,則調度沒有任何意義
  // Drivers take strict precedence over executors
  val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
 
//將workers隨機化,有利於負載均衡
  for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {//判斷worker的狀態,只有Alive級別的worker才能參與資源的分配工作
    for (driver <- waitingDrivers) {//循環遍歷等待中的driver,當然這裏指的是cluster模式,如果是client模式的話,driver就自動啟動了。
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
 
//當worker的free內存和cpu比driver所需要的多的時候,將driver放到workers中隨機的一個worker,啟動driver
        launchDriver(worker, driver)
        waitingDrivers -= driver//將啟動的driver在等待隊列中移除。
      }
    }
  }
  startExecutorsOnWorkers()
}

schedule的代碼解析(簡單的就放在上面的代碼註釋裏了)

Random.shuffle(workers)  將worker在master緩存數據結構中的順序打亂
 
def shuffle[T, CC[X] <: TraversableOnce[X]](xs: CC[T])(implicit bf: CanBuildFrom[CC[T], T, CC[T]]): CC[T] = {
  val buf = new ArrayBuffer[T] ++= xs//構建一個臨時的緩沖數組
 
  def swap(i1: Int, i2: Int) {//交換數組中指定下表的兩個元素
    val tmp = buf(i1)
    buf(i1) = buf(i2)
    buf(i2) = tmp
  }
 
  for (n <- buf.length to 2 by -1) {//生成隨機數,並不停交換,打亂了數組中元素的順序
    val k = nextInt(n)
    swap(n - 1, k)
  }
 
  (bf(xs) ++= buf).result//返回隨機化的新集合(這裏就是workers的集合了)
}

2 waitingDrivers

private val waitingDrivers = new ArrayBuffer[DriverInfo]
 
可以看到這裏waitingDrivers是一個數據元素為DriverInfo的數組,DriverInfo包含了driver的信息startTime(啟動時間),id,desc(driver的描述信息),submitDate(提交日期)
 
private[deploy] class DriverInfo(
    val startTime: Long,
    val id: String,
    val desc: DriverDescription,
    val submitDate: Date)
  extends Serializable {
 
其中描述信息包含了一下內容
 
private[deploy] case class DriverDescription(
    jarUrl: String,//jar包地址
    mem: Int,//內存信息
    cores: Int,//CPU
    supervise: Boolean//當spark-submit指定driver在cluster模式下運行的話如果設定了supervise,driver掛掉的時候回自動重啟,
    command: Command) {//一些環境信息
 
  override def toString: String = s"DriverDescription (${command.mainClass})"
}

3 launchDriver spark只有先啟動driver才能進行後面具體的調度

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  worker.addDriver(driver)//表明driver運行的worker
  driver.worker = Some(worker)//driver和worker的相互引用
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))//master通過遠程rpc發指令給worker,讓worker啟動driver。
  driver.state = DriverState.RUNNING//啟動之後將driver的狀態轉為RUNNING
}

4 startExecutorsOnWorkers 先進先出的隊列方式進行簡單調度,spark默認啟動Executor的方式是FIFO的方式,只有前一個app滿足了資源分配的基礎上,才會為下一個應用程序分配資源

/**
 * Schedule and launch executors on workers
 */
private def startExecutorsOnWorkers(): Unit = {
  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc.
  for (app <- waitingApps if app.coresLeft > 0) {//為應用程序具體分配Executor之前會判斷當前應用程序是否還需要cores,
如果不需要則不會為應用程序分配Executor
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor//應用程序所需要的cores // Filter out workers that don‘t have enough resources to launch an executor //過濾掉不滿足條件的worker,條件為:worker的狀態必須是AlIVE的,worker的內存和cpu必須比每一個Executor所需要的大。 //過濾完之後,按照可用cores進行排序,並將大的放到前面,最優的最先使用。 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse //這裏采用spreadOutApps的方式來讓應用程序盡可能分散的運行在每一個Node上,這種方式往往能順便帶來更好的數據本地性,通常數據是分散的分布在各臺機器上,這種方式通常也是默認的。這方法返回的是每一個分配給每一個worker的cores的數組。具體的在分配cores的時候回盡可能的滿足當前所需的 val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) // Now that we‘ve decided how many cores to allocate on each worker, let‘s allocate them //下面進行真正的分配Executors,Master通過遠程通信發指令給Worker來啟動ExecutorBackend進程,向driver發送ExecutorAdded通信。 for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } } }

Spark資源調度