1. 程式人生 > >Spark Master 如何分配叢集資源?

Spark Master 如何分配叢集資源?

         本文以Spark 1.6 原始碼為例,解讀Spark Master 如何分配叢集資源。每次Master receive到Worker傳送Register worker 訊息請求、Client 傳送Register driver 請求、和 Register Application和LaunchExecutor等請求、還有結束釋放driver和Executor等都會對叢集資源進行再分配,那麼Master是怎麼進行資源分配的,讓我們剝筍抽繭蹭蹭深入解讀。整個資源排程函式入口為Master類的方法 schedule()方法,程式碼如下:

/**
 * Schedule the currently availableresources among waiting apps. This method will be called
 * every time a new app joins or resourceavailability changes.
 */

private def schedule(): Unit = {
  
if (state != RecoveryState.ALIVE) { return } //不是active master不操作// Drivers take strict precedence overexecutors,優先分配driver的資源val shuffledWorkers = Random.shuffle(workers// Randomization helps balance drivers,

//隨機打散,負載均衡

//輪詢打散的Worker,針對每臺Worker,按照driver的註冊順序,為每個driver嘗試分配資源
  for 

(worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {//活的Worker分配

//資源for (driver <- waitingDrivers) {//遍歷每個需要獲取資源的driver,按照先入先出FIFO
      
if (worker.memoryFree >= driver.desc.mem&& worker.coresFree >= driver.desc.cores) { //如果當前Worker的預留資源(CPU+Memory)滿足當前driver所需資源要求,就將driver

所需的資源分

//配給driver
        launchDriver(worker
driver) //啟動當前driver
        
waitingDrivers -= driver //正在等待分配資源的driver列表中移除當前獲取到資源的driver
      }
    }
  }
  startExecutorsOnWorkers() //
開始Worker上分配executor
}

分配資源給driver,程式碼如下:

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  worker.addDriver(driver) 
  driver.worker = Some(worker) 
//Master端的Worker EndpointRef 傳送訊息給對應的Worker,Worker啟動dirver
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  driver.state = DriverState.RUNNING //同時標記當前分配資源好的driver正在執行}
//workerinfo 類程式碼
def addDriver(driver: DriverInfo) {
  drivers(driver.id) = driver //加入當前Worker已經執行的driver的集合memoryUsed += driver.desc.mem //更新正使用的CPUMemory資訊coresUsed += driver.desc.cores
}

到此為止資源本輪分配給driver的任務已經搞定了。總體上,Master優先分配給Driver資源,每次呼叫schedule()方法就會遍歷所有正在等待分配資源的Waiting-Driver,針對每個Waiting-Driver,從shuffled Workers 中依次選取Worker,看當前Worker空閒的資源是否滿足啟動當前Worker所需資源,如果滿足就在這個Worker上分配資源給當前Driver,並運行當前driver;如果遍歷完後,發現沒有一個Worker滿足資源要求,就繼續等待到下一輪schedule。

        接下來輪到分配資源給Executor了,程式碼如下:

/**
 * Schedule and launch executors on workers
 */
private def startExecutorsOnWorkers(): Unit = {
  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc. 
//哪個Application先註冊,就先把資源儘可能分配給該Application ,按照App註冊順序排程
  //coresLef 還需要多少CPUMaster分配coreapp不是一次配齊
for (app <- waitingApps if app.coresLeft > 0) { 
    val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor//每個Executor需要多少core
// Filter out workers that don't have enough resources to launch an executor
//所有已註冊的Worker中滿足最少能申請到一個Executor的條件按照可分配資源由小到大排序val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
      .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
        worker.coresFree >= coresPerExecutor.getOrElse(1))
      .sortBy(_.coresFree).reverse 
//計算可分配資源的Worker此次分配資源的額數目(有可能最終排程還是分配不了資源)val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
    // Now that we've decided how many cores to allocate on each worker, let's allocate them
    //分配過程是對每個App,一下滿足其所有Executor的資源要求
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
//對每個實際可以分配資源的Worker分配資源給executor
      allocateWorkerResourceToExecutors(
        app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) //正式分配資源
    }
  }
}

先解讀正式分配資源給executor的程式碼,如下:

/**
 * Allocate a worker's resources to oneor more executors.
 * @param app the info of theapplication which the executors belong to
 * @param assignedCores number ofcores on this worker for this application
 * @param coresPerExecutor numberof cores per executor
 * @param worker the worker info
 */
private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo
,
    
assignedCores: Int,
    
coresPerExecutor: Option[Int],
    
worker: WorkerInfo): Unit = {
  
//計算當前Worker總共可以分配多少個executor, 可分配的核數/每個executor多少核,

//如果沒有指定每個executor多少核數,那麼就在當前worker上只分配一個executor,分配核數為當前Worker

//本輪所能分配的

val numExecutors = coresPerExecutor.map {assignedCores / _ }.getOrElse(1)

//每個executor分配核數,如果沒有指定,就是總共可以分配的核數val coresToAssign =coresPerExecutor.getOrElse(assignedCores)
  
for (i <- to numExecutors) {
    
val exec = app.addExecutor(workercoresToAssign)
    launchExecutor(worker
exec)
    app.
state = ApplicationState.RUNNING //app 分配到一個executor,就開始跑}
}

//Applicationinfo class

private[master] def addExecutor(
    worker: WorkerInfo
,
    
cores: Int,
    
useID: Option[Int] = None): ExecutorDesc = {
  
val exec = new ExecutorDesc(newExecutorId(useID)thisworkercoresdesc.memoryPerExecutorMB)
  
executors(exec.id) = exec //加入當前APP Executor集合coresGranted += cores 
  exec
}

//masterclass

privatedef launchExecutor(worker: WorkerInfoexec: ExecutorDesc): Unit = {
  logInfo(
"Launching executor " + exec.fullId + " on worker " + worker.id)
  worker.addExecutor(exec)

//傳送訊息給Worker,讓其落實分配資源並啟動Executor
  worker.endpoint.send(LaunchExecutor(masterUrl,
    
exec.application.idexec.idexec.application.descexec.coresexec.memory))
   
//傳送訊息給driver,讓其更新所有executor元資料資訊

exec.application.driver.send(ExecutorAdded(
    exec.id
worker.idworker.hostPortexec.coresexec.memory)) 
}

         接下來看資源分配重點內容:Master如何綜合整個叢集的空閒資源進行Executor資源分配工作,程式碼如下:

/**
 * Schedule executors to be launched onthe workers.
 * Returns an array containing number ofcores assigned to each worker.
 * There are two modes of launchingexecutors.

1. The firstattempts to spread out an application's executors on as many workers aspossible,

2. whilethe second does the opposite (i.e. launch them on as few workers as possible).The former is usually better for data locality purposes and is the default.

 The number of cores assigned to each executoris configurable. When this is explicitly set, multiple executors from the sameapplication may be launched on the same worker if the worker has enough coresand memory. Otherwise, each executor grabs all the cores available on the workerby default, in which case only one executor may be launched on each worker. Itis important to allocate coresPerExecutor on each worker at a time (instead of1 core at a time).

Considerthe following example: cluster has 4 workers with 16 cores each. User requests3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is allocatedat a time, 12 cores from each worker would be assigned to each executor. Since12 < 16, no executors would launch [SPARK-8881].
 */
private def scheduleExecutorsOnWorkers(
    app: ApplicationInfo
,
    
usableWorkers: Array[WorkerInfo],
    
spreadOutApps: Boolean): Array[Int] = {
  
val coresPerExecutor =app.desc.coresPerExecutor //每個exectuor分配多少核數val minCoresPerExecutor = coresPerExecutor.getOrElse(1//沒有配置,則exe最少1個核val oneExecutorPerWorker = coresPerExecutor.isEmpty //是否每個Worker分配一個Executor
val memoryPerExecutor = app.desc.memoryPerExecutorMB //每個exe分配多少MB記憶體val numUsable = usableWorkers.length //可分配資源的Worker數,滿足分配條件按照空閒資源從小到大val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
//app還需多少core資源和可分配資源Worker所有可分配核數之和的最小值

var coresToAssign = math.min(app.coresLeftusableWorkers.map(_.coresFree).sum)

/** Return whether the specified worker canlaunch an executor1個) for this app. */
def canLaunchExecutor(pos: Int): Boolean = {

//如果app還需要核數大於每個exe最少核數,也就是需在分配executor
val keepScheduling = coresToAssign >=minCoresPerExecutor

//當前Worker總共可分配的核數減去已經分配的核數大於分配一個exe的最小合數,說明可以分配val enoughCores = usableWorkers(pos).coresFree- assignedCores(pos) >= minCoresPerExecutor
    
// If we allow multiple executors perworker, then we can always launch new executors.
    // Otherwise, if there is already anexecutor on this worker, just give it more cores.
   

//如果並不是worker只能分配一個executor,也就是沒有指定executor核數的情況,或者當前worker針對

//當前app此次schedule還沒有分配一個Executor,則可以分配新的executor

val launchingNewExecutor =!oneExecutorPerWorker || assignedExecutors(pos) == 0
    
if (launchingNewExecutor) {  //memorycore 同時滿足要求才給分配val assignedMemory = assignedExecutors(pos) *memoryPerExecutor

//如果worker free memory減去已經分配的memory大於一個exe需要的memory,則記憶體足夠val enoughMemory =usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor

//如果即將分配給當前Appexe數加上已經分配了的exe數目小於其最大exe數限制,標明還有分配空間val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
     
 //4個條件都符合,說明當前Worker可以分配一個ExecutorApp

keepScheduling&& enoughCores && enoughMemory && underLimit
    } 
else {
      
// We're adding cores to an existingexecutor, so no need to check memory and executor limits

//增加核數到executor,不增加executor
keepScheduling && enoughCores
    }
  }

  
// Keep launching executors until no moreworkers can accommodate any
  // more executors, or if we havereached this application's limits
var freeWorkers = (untilnumUsable).filter(canLaunchExecutor) //可以分配資源的Workerwhile (freeWorkers.nonEmpty) {
    freeWorkers.foreach { pos =>
      
var keepScheduling = true //設定可排程值為真      while (keepScheduling &&canLaunchExecutor(pos)) {
        coresToAssign -=minCoresPerExecutor 
//總共還需分配核數減去當前分配的核數        assignedCores(pos) += minCoresPerExecutor//當前Worker此輪已經分配分配核數疊加if (oneExecutorPerWorker) {
          assignedExecutors(pos) = 
1 //如果一個Worker,一個Executor,那麼當前Worker分配結束else {
          assignedExecutors(pos) += 
1
        
}
       
 //Spreading out an application means spreading out its executors across as
        // many workers as possible. Ifwe are not spreading out, then we should keep
        // scheduling executors on thisworker until we use all of its resources.
        // Otherwise, just move on to thenext worker.
if (spreadOutApps) { //appexecutor儘可能把分佈到多個worker          keepScheduling = false
        
}
      }
    }
    freeWorkers =freeWorkers.filter(canLaunchExecutor) 
//下一輪分配  }
  assignedCores
}

         總結一下,Master每次schedule對所有registered app按照FIFO依次分配資源給app,對每個app採用輪詢分配的方法對所有Worker free資源最少滿足分配一個Executor所需資源的Worker輪詢分配,分配資源的方法有三種:

1、SpreadOut方法(預設)

對每臺能力分配executor的Worker每次迭代分配只一個Executor,直到輪詢到的Worker無法滿足分配一個executor所需的資源的需求;

2、非SpreadOut方法

每次迭代對每臺有能力分配executor的Worker窮其可分配的資源儘可能分配多的executor;

3、Worker-Per-Executor 方法

如果app沒有指定每個executor的核數,則預設每個Worker上最多隻分配一個executor,迭代輪詢資源滿足分配需求的Worker,輪詢到當前Worker,如果沒有分配executor,則分配一個executor。

Master採用FIFO分配叢集資源的方式,可能誘發大任務長期佔用資源,而小任務因不能夠及時獲取到資源而長時間等待的問題,所以該排程不夠精細,可以借鑑作業系統資源排程方案。