1. 程式人生 > >spark中的動態executor分配

spark中的動態executor分配

動態分配executor的例項初始化部分

如果spark.executor.instances配置項設定為0或者沒有設定,這個預設情況下是一個未設定的值,yarn的執行模式時,這個配置通過--num-executors來得到.

同時spark.dynamicAllocation.enabled配置項設定為true時.預設值為false,表示啟用了動態分配executor.

在driver端SparkContext生成時,會檢查上面兩個配置項,如果這兩個配置滿足動態executor分配的要求時,會生成一個ExecutorAllocationManager例項.

_executorAllocationManager 

=if (dynamicAllocationEnabled) {Some(new ExecutorAllocationManager(thislistenerBus_conf))  } else {    None  }_executorAllocationManager.foreach(_.start())

必要的配置項:

1,配置項spark.dynamicAllocation.minExecutors,預設值0,最少分配的executor的個數.

2,配置項spark.dynamicAllocation.maxExecutors,預設值int.maxvalue.最大可分配的executor的個數.

3,配置項spark.dynamicAllocation.initialExecutors,預設值為配置項1的值,初始化時啟用的executor的個數,

4,1,配置項spark.dynamicAllocation.schedulerBacklogTimeout,預設值1s,如果未分配的task等待分配的時間超過了這個配置的時間,表示需要新啟動executor.

4,2,配置項spark.dynamicAllocation.sustainedSchedulerBacklogTimeout,預設是4,1,配置項的值,這個配置用於設定在初始排程的executor排程延時後,每次的等待超時時間.

5,配置項spark.dynamicAllocation.executorIdleTimeout,預設值60s,executor的空閒回收時間.

6,配置項spark.executor.cores的配置(executor-cores)必須大於或等於配置項spark.task.cpus的值(這個配置預設是1,這是每個task需要的cpu的個數).

7,配置項spark.shuffle.service.enabled必須配置為true,預設為false.如果這個配置設定為true時,BlockManager例項生成時,需要讀取spark.shuffle.service.port配置項配置的shuffle的埠,同時對應BlockManager的shuffleClient不在是預設的BlockTransferService例項,而是ExternalShuffleClient例項.

8,初始化時,ExecutorAllocationManager中的屬性initializing預設值為true,表示定時排程時,什麼都不做.

在執行ExecutorAllocationManager中的start函式時:

def start(): Unit = {

這裡把ExecutorAllocationListener例項(內部實現類)新增到sparkContext中的listenerBus中,用於監聽stage,task的啟動與完成,並做對應的操作.  listenerBus.addListener(listener)val scheduleTask = new Runnable() {override def run(): Unit = {try {        schedule()      } catch {case ct: ControlThrowable =>throw ctcase t: Throwable =>          logWarning(s"Uncaught exception in thread 

${Thread.currentThread().getName}"t)      }    }  }

定時100ms執行一次schedule的排程函式,來進行task的分析.executor.scheduleAtFixedRate(scheduleTask0intervalMillis

TimeUnit.MILLISECONDS)}

executor個數分配的計算

針對task的排程主要由一個定時器每100ms進行一次schedule函式的呼叫.

private def schedule(): Unit = synchronized {

在這個函式中,首先得到當前的時間,val now = clock.getTimeMillis

在呼叫這個函式時,初始情況下,initializing的屬性值為true,這個時候,這個函式什麼也不做.

這個函式的內容,後面在進行分析.  updateAndSyncNumExecutorsTarget(now)

這個removeTimes集合中記錄有每一個executor沒有被task佔用後的時間,如果這個時間超過了上面配置的idle的時間,會移出掉這個executor,同時設定initializing屬性為false,表示可以繼續進行task的排程.retain函式只保留未超時的executor.removeTimes.retain { case (executorIdexpireTime) =>val expired = now >= expireTimeif (expired) {initializing falseremoveExecutor(executorId)    }    !expired  }}

如何知道stage被提交?看下面,

在SparkContext中,執行runJob命令時,針對一個stage進行submit操作時,會呼叫listenerBus中所有的listener對應的onStageSubmitted函式.

而在ExecutorAllocationManager進行start操作時,生成了一個listener,例項為ExecutorAllocationListener,並把這個listener新增到了listenerBus中.

接下來看看ExecutorAllocationListener中對應stage提交的監聽處理:

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted)

Unit = {

這裡首先把initializing的屬性值設定為false,表示下次定時排程時,需要執行executor的分配操作.initializing false

得到進行submit操作對應的stage的id與stage中對應的task的個數.  val stageId = stageSubmitted.stageInfo.stageIdval numTasks = stageSubmitted.stageInfo.numTasks  allocationManager.synchronized {

通過對應的stageId設定這個stage的task的個數,儲存到stageIdToNumTasks集合中.stageIdToNumTasks(stageId) = numTasks

這裡更新allocationManager中的addTime的時間,

由當前時間加上配置spark.dynamicAllocation.schedulerBacklogTimeout的超時時間.    allocationManager.onSchedulerBacklogged()

這裡根據每個task對應的host,計算出每個host對應的task的個數,numTasksPending的個數原則上應該與stage中numTask的個數相同.// Compute the number of tasks requested by the stage on each hostvar numTasksPending = 0val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()    stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>if (!locality.isEmpty) {        numTasksPending += 1locality.foreach { location =>val count = hostToLocalTaskCountPerStage.getOrElse(location.host0) + 1hostToLocalTaskCountPerStage(location.host) = count        }      }    }

在對應的集合中,根據stageid與pending的task的個數,對應的host與host對應的task的個數進行儲存.stageIdToExecutorPlacementHints.put(stageId,(numTasksPendinghostToLocalTaskCountPerStage.toMap))

下面的函式迭代stageIdToExecutorPlacementHints集合中的values,並更新allocationManager中localityAwareTasks屬性(儲存待啟動的task的個數)與hostToLocalTaskCount集合屬性(儲存host對應的task的個數)的值.新增到這裡,主要是executor啟動時對應的排程啟動task// Update the executor placement hintsupdateExecutorPlacementHints()  }}

接面看看allocationManager中定時排程的updateAndSyncNumExecutorsTarget函式:

現在來說說updateAndSyncNumExecutorsTarget函式與addExecutors函式的作用:

示例說明:

假定這次的stage需要的executor的個數為5,numExecutorsTarget的配置保持預設值0,

如果是第一次排程啟動時,在updateAndSyncNumExecutorsTarget函式中:

1,先計算出這個stage需要的executor的個數,

val maxNeeded = maxNumExecutorsNeededif (initializing) {

如果函式進行這裡,表示還沒有stage提交,也就是沒有job被執行.不進行排程.// Do not change our target while we are still initializing,    // Otherwise the first job may have to ramp up unnecessarily0}

2,進入的流程為else if (addTime != NOT_SET && now >= addTime)部分.這個時候執行addExecutors函式,(這裡假定時間已經達到了addTime的超時時間)

這種情況下預設的初始executor的個數為0的情況下,在當前時間超過了等待超時時間後,會進入,第一次時需要等待一秒鐘,每次執行會更新等待時間.

這裡根據要stage對應的task需要的executor的個數,並執行addExecutors的函式.

else if (addTime != NOT_SET && now >= addTime) {val delta = addExecutors(maxNeeded)    logDebug(s"Starting timer to add more executors (to " +s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")addTime += sustainedSchedulerBacklogTimeoutS 1000delta}

在addExecutors函式中,先計算出目標的executor的個數(屬性numExecutorsTarget),

// Do not request more executors if it would put our target over the upper boundif (numExecutorsTarget >= maxNumExecutors) {    logDebug(s"Not adding executors because our current target total " +s"is already $numExecutorsTarget (limit $maxNumExecutors)")numExecutorsToAdd 1return 0}val oldNumExecutorsTarget = numExecutorsTarget// There's no point in wasting time ramping up to the number of executors we already have, so  // make sure our target is at least as much as our current allocation:numExecutorsTarget = math.max(numExecutorsTargetexecutorIds.size)// Boost our target with the number to add for this round:numExecutorsTarget += numExecutorsToAdd// Ensure that our target doesn't exceed what we need at the present moment:numExecutorsTarget = math.min(numExecutorsTargetmaxNumExecutorsNeeded)// Ensure that our target fits within configured bounds:numExecutorsTarget = math.max(math.min(numExecutorsTargetmaxNumExecutors)

minNumExecutors)

此時executorIds的長度為0,集合是個空集合,這個時候numExecutorsToAdd的值為預設的1,根據上面的程式碼計算完成後(maxNumExecutorsNeeded為5就是tasks需要的executor的個數),numExecutorsTarget的值為1.接下來計算出來一個值,如果這次任務的目標executor的個數高於上次tasks啟動的目標executor的個數,delta的值是一個大於0的值.根據上面的說明,下面程式碼中這個delta的值為1,val delta = numExecutorsTarget - oldNumExecutorsTarget// If our target has not changed, do not send a message  // to the cluster manager and reset our exponential growthif (delta == 0) {

如果delta等於0,表示這次的目標executor的個數,與上次任務的executor的個數相同,重置增量的個數為1.numExecutorsToAdd 1return 0}接下來,通過下面的程式碼通過SparkContext發起numExecutorsTarget的executor的啟動,並在executor中載入對應的task的個數.

val addRequestAcknowledged = testing ||    client.requestTotalExecutors(numExecutorsTargetlocalityAwareTasks

hostToLocalTaskCount)

接下來,由於我們的任務執行還需要的executor的個數還需要4個(共需要),同時這個時候,delta的值為1,與numExecutorsToAdd的屬性值相同,因此numExecutorsToAdd的值會*2.

    numExecutorsToAdd if (delta == numExecutorsToAdd) {numExecutorsToAdd 2else {1}

3,排程定時器開始執行第二次排程啟動,這個時候執行updateAndSyncNumExecutorsTarget函式時,numExecutorsTarget的值為1,需要的executor的個數為3,因此,還是會執行時間超時的流程.

else if (addTime != NOT_SET && now >= addTime) {val delta = addExecutors(maxNeeded)    logDebug(s"Starting timer to add more executors (to " +s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")addTime += sustainedSchedulerBacklogTimeoutS 1000delta}

再次進入addExecutors函式,這個時候numExecutorsToAdd屬性值為2,numExecutorsTarget屬性值為1,executorsIds的size為1,已經有一個executor被啟動,需要的executor的個數為3,最後計算完成後,numExecutorsTarget屬性的值為3.計算出來當前的numExecutorsTarget與上一次的numExecutorsTarget的delta的值為2,開始根據這個值為3的numExecutorsTarget發起task的啟動請求.

接下來,由於計算出來的delta的值為2,而屬性numExecutorsToAdd的值也為2,

因此numExecutorsToAdd屬性值現在還是需要*2操作.執行完成後,最後這個numExecutorsToAdd屬性值修改成了4.

4,這個時候,由於還有部分task沒有被執行,開始第三次的處理,此時,numExecutorTarget的值還是小於目標的executor的個數,接著執行addExecutors函式,此時,executorsIds的size為4,第一次執行一個,第二次啟動了3個,這個時候,numExecutorsTarget的屬性值變化情況:

首先先修改成4,(取numExecutorsTarget與executorIds.size中的最大值),

然後numExecutorsTarget += numExecutorsToAdd的值,這個時候值修改成了8.

最後與共需要的executor的個數5取最小值,把值修改成5.計算出當前的numExecutorsTarget與上一次的numExecutorsTarget的差值為2,numExecutorsToAdd的值為4,因此重新修改numExecutorsToAdd的值為1.

這個時候排程程式會修改addTime的值為NOT_SET,表示不在執行executor的排程.因為executor已經夠了.

5,現在假定spark.dynamicAllocation.initialExecutors配置項配置有一個值,初始值為6.需要的executor的個數還是是5.這個時候,進入updateAndSyncNumExecutorsTarget函式時,執行如下的流程部分,因為初始的executor的個數大於了需要的executor的個數.這部分流程在設定有初始大小的executor個數或者說要執行的job的第二個stage的task的個數需要的executor的個數小於小次stage需要的executor的個數時,會被執行.

else if (maxNeeded < numExecutorsTarget) {// The target number exceeds the number we actually need, so stop adding new  // executors and inform the cluster manager to cancel the extra pending requestsval oldNumExecutorsTarget = numExecutorsTarget  numExecutorsTarget = math.max(maxNeededminNumExecutors)numExecutorsToAdd 1// If the new target has not changed, avoid sending a message to the cluster managerif (numExecutorsTarget < oldNumExecutorsTarget) {    client.requestTotalExecutors(numExecutorsTargetlocalityAwareTasks

hostToLocalTaskCount)    logDebug(s"Lowering target number of executors to $numExecutorsTarget

      (previously " +s"$oldNumExecutorsTarget) because not all requested executors are actually 

      needed")  }numExecutorsTarget - oldNumExecutorsTarget}

在上面的程式碼中,重新根據需要的executor的個數,計算出numExecutorsTarget的值,這個時候,新的numExecutorsTarget的值為5,而老的numExecutorsTarget的值為6,因此通過新的numExecutorsTarget直接呼叫SparkContext中對應的啟動executor的函式,發起對executor的排程與task的啟動.

通過SparkContext排程executor

allocationManager,executor進行動態的呼叫後,會執行如下的程式碼片斷.

client.requestTotalExecutors(numExecutorsTargetlocalityAwareTasks

hostToLocalTaskCount)

在上面的程式碼中,client就是SparkContext例項.

下面看看這個函式的處理流程:

函式的傳入引數中:

numExecutors是目標的executor的個數,

第二個是共需要的task的個數,

第三個是host->taskCount的集合.

private[spark] override def requestTotalExecutors(    numExecutors: Int,localityAwareTasks: Int,hostToLocalTaskCount: scala.collection.immutable.Map[String, Int]  ): Boolean = {  schedulerBackend match {case b: CoarseGrainedSchedulerBackend =>

這裡直接呼叫了GoarseGrainedSchedulerBackend中對應的函式.      b.requestTotalExecutors(numExecutorslocalityAwareTasks

hostToLocalTaskCount)case _ =>      logWarning("Requesting executors is only supported in coarse-grained mode")false}}

下面看看GoarseGrainedSchedulerBackendrequestTotalExecutors的函式實現:

final override def requestTotalExecutors(    numExecutors: Int,localityAwareTasks: Int,hostToLocalTaskCount: Map[String, Int]  ): Boolean = synchronized {if (numExecutors < 0) {throw new IllegalArgumentException("Attempted to request a negative number of executor(s) " +s"$numExecutors from the cluster manager. Please specify a positive number!")  }this.localityAwareTasks = localityAwareTasksthis.hostToLocalTaskCount = hostToLocalTaskCount

每次執行時,計算出還需要的共需要的executor的個數與正在執行或者等待回收的executor的個數之間的差值,這個差值是還需要啟動的executor的個數.numPendingExecutors =    math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size

0)

這裡根據具體的cluster的部署模式(yarn,standalone,mesos,等),呼叫對應的函式進行executor的啟動操作.這裡我們看看standalone的操作.由SparkDeploySchedulerBackend實現.

這個函式的實現主要是通過向master傳送一個RequestExecutors訊息,這個訊息是一個需要響應的訊息.

這個訊息在Master中通過receiveAndReply函式中的RequestExecutors部分進行處理.  doRequestTotalExecutors(numExecutors)}

Master中處理executor的申請:

caseRequestExecutors(appIdrequestedTotal) =>context.reply(handleRequestExecutors(appIdrequestedTotal))

看看這個的handleRequestExecutors函式

private def handleRequestExecutors(appId: StringrequestedTotal: Int)

Boolean = {idToApp.get(appId) match {

這個函式中,根據傳入的app對應的job共依賴的executor的個數,更新appInfo中executorLimit的值.並執行對executor的啟動的排程.case Some(appInfo) =>      logInfo(s"Application $appId requested to set total executors to 

$requestedTotal.")      appInfo.executorLimit = requestedTotal

在這個排程的過程中通過startExecutorsOnWorkers函式來排程與啟動executor在對應的worker中,

在判斷要啟動的executor的個數時,會根據scheduleExecutorsOnWorkers函式來判斷executor的個數是否達到要求的appInfo.executorLimit的個數,如果達到指定的executor的個數時,排程不再執行executor的啟動.判斷worker是否有足夠的資源啟動executor時,通過對executor需要的cpu core的個數與executor需要的記憶體來判斷worker是否有足夠的對應資源啟動executor,如果有,表示這個worker可以用來啟動executor,迭代所有的worker進行executor的啟動,當已經啟動的executor的個數達到了appInfo的executorLimit的限制時,不在進行分配.      schedule()true    case None =>      logWarning(s"Unknown application $appId requested 

$requestedTotal total executors.")false}}

Driver端處理