spark中的動態executor分配
動態分配executor的例項初始化部分
如果spark.executor.instances配置項設定為0或者沒有設定,這個預設情況下是一個未設定的值,yarn的執行模式時,這個配置通過--num-executors來得到.
同時spark.dynamicAllocation.enabled配置項設定為true時.預設值為false,表示啟用了動態分配executor.
在driver端SparkContext生成時,會檢查上面兩個配置項,如果這兩個配置滿足動態executor分配的要求時,會生成一個ExecutorAllocationManager例項.
_executorAllocationManager
必要的配置項:
1,配置項spark.dynamicAllocation.minExecutors,預設值0,最少分配的executor的個數.
2,配置項spark.dynamicAllocation.maxExecutors,預設值int.maxvalue.最大可分配的executor的個數.
3,配置項spark.dynamicAllocation.initialExecutors,預設值為配置項1的值,初始化時啟用的executor的個數,
4,1,配置項spark.dynamicAllocation.schedulerBacklogTimeout,預設值1s,如果未分配的task等待分配的時間超過了這個配置的時間,表示需要新啟動executor.
4,2,配置項spark.dynamicAllocation.sustainedSchedulerBacklogTimeout,預設是4,1,配置項的值,這個配置用於設定在初始排程的executor排程延時後,每次的等待超時時間.
5,配置項spark.dynamicAllocation.executorIdleTimeout,預設值60s,executor的空閒回收時間.
6,配置項spark.executor.cores的配置(executor-cores)必須大於或等於配置項spark.task.cpus的值(這個配置預設是1,這是每個task需要的cpu的個數).
7,配置項spark.shuffle.service.enabled必須配置為true,預設為false.如果這個配置設定為true時,BlockManager例項生成時,需要讀取spark.shuffle.service.port配置項配置的shuffle的埠,同時對應BlockManager的shuffleClient不在是預設的BlockTransferService例項,而是ExternalShuffleClient例項.
8,初始化時,ExecutorAllocationManager中的屬性initializing預設值為true,表示定時排程時,什麼都不做.
在執行ExecutorAllocationManager中的start函式時:
def start(): Unit = {
這裡把ExecutorAllocationListener例項(內部實現類)新增到sparkContext中的listenerBus中,用於監聽stage,task的啟動與完成,並做對應的操作. listenerBus.addListener(listener)val scheduleTask = new Runnable() {override def run(): Unit = {try { schedule() } catch {case ct: ControlThrowable =>throw ctcase t: Throwable => logWarning(s"Uncaught exception in thread
${Thread.currentThread().getName}", t) } } }
定時100ms執行一次schedule的排程函式,來進行task的分析.executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis,
TimeUnit.MILLISECONDS)}
對executor的個數分配的計算
針對task的排程主要由一個定時器每100ms進行一次schedule函式的呼叫.
private def schedule(): Unit = synchronized {
在這個函式中,首先得到當前的時間,val now = clock.getTimeMillis
在呼叫這個函式時,初始情況下,initializing的屬性值為true,這個時候,這個函式什麼也不做.
這個函式的內容,後面在進行分析. updateAndSyncNumExecutorsTarget(now)
這個removeTimes集合中記錄有每一個executor沒有被task佔用後的時間,如果這個時間超過了上面配置的idle的時間,會移出掉這個executor,同時設定initializing屬性為false,表示可以繼續進行task的排程.retain函式只保留未超時的executor.removeTimes.retain { case (executorId, expireTime) =>val expired = now >= expireTimeif (expired) {initializing = falseremoveExecutor(executorId) } !expired }}
如何知道stage被提交?看下面,
在SparkContext中,執行runJob命令時,針對一個stage進行submit操作時,會呼叫listenerBus中所有的listener對應的onStageSubmitted函式.
而在ExecutorAllocationManager進行start操作時,生成了一個listener,例項為ExecutorAllocationListener,並把這個listener新增到了listenerBus中.
接下來看看ExecutorAllocationListener中對應stage提交的監聽處理:
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted)
: Unit = {
這裡首先把initializing的屬性值設定為false,表示下次定時排程時,需要執行executor的分配操作.initializing = false
得到進行submit操作對應的stage的id與stage中對應的task的個數. val stageId = stageSubmitted.stageInfo.stageIdval numTasks = stageSubmitted.stageInfo.numTasks allocationManager.synchronized {
通過對應的stageId設定這個stage的task的個數,儲存到stageIdToNumTasks集合中.stageIdToNumTasks(stageId) = numTasks
這裡更新allocationManager中的addTime的時間,
由當前時間加上配置spark.dynamicAllocation.schedulerBacklogTimeout的超時時間. allocationManager.onSchedulerBacklogged()
這裡根據每個task對應的host,計算出每個host對應的task的個數,numTasksPending的個數原則上應該與stage中numTask的個數相同.// Compute the number of tasks requested by the stage on each hostvar numTasksPending = 0val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]() stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>if (!locality.isEmpty) { numTasksPending += 1locality.foreach { location =>val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1hostToLocalTaskCountPerStage(location.host) = count } } }
在對應的集合中,根據stageid與pending的task的個數,對應的host與host對應的task的個數進行儲存.stageIdToExecutorPlacementHints.put(stageId,(numTasksPending, hostToLocalTaskCountPerStage.toMap))
下面的函式迭代stageIdToExecutorPlacementHints集合中的values,並更新allocationManager中localityAwareTasks屬性(儲存待啟動的task的個數)與hostToLocalTaskCount集合屬性(儲存host對應的task的個數)的值.新增到這裡,主要是executor啟動時對應的排程啟動task// Update the executor placement hintsupdateExecutorPlacementHints() }}
接面看看allocationManager中定時排程的updateAndSyncNumExecutorsTarget函式:
現在來說說updateAndSyncNumExecutorsTarget函式與addExecutors函式的作用:
示例說明:
假定這次的stage需要的executor的個數為5,numExecutorsTarget的配置保持預設值0,
如果是第一次排程啟動時,在updateAndSyncNumExecutorsTarget函式中:
1,先計算出這個stage需要的executor的個數,
val maxNeeded = maxNumExecutorsNeededif (initializing) {
如果函式進行這裡,表示還沒有stage提交,也就是沒有job被執行.不進行排程.// Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily0}
2,進入的流程為else if (addTime != NOT_SET && now >= addTime)部分.這個時候執行addExecutors函式,(這裡假定時間已經達到了addTime的超時時間)
這種情況下預設的初始executor的個數為0的情況下,在當前時間超過了等待超時時間後,會進入,第一次時需要等待一秒鐘,每次執行會更新等待時間.
這裡根據要stage對應的task需要的executor的個數,並執行addExecutors的函式.
else if (addTime != NOT_SET && now >= addTime) {val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " +s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")addTime += sustainedSchedulerBacklogTimeoutS * 1000delta}
在addExecutors函式中,先計算出目標的executor的個數(屬性numExecutorsTarget),
// Do not request more executors if it would put our target over the upper boundif (numExecutorsTarget >= maxNumExecutors) { logDebug(s"Not adding executors because our current target total " +s"is already $numExecutorsTarget (limit $maxNumExecutors)")numExecutorsToAdd = 1return 0}val oldNumExecutorsTarget = numExecutorsTarget// There's no point in wasting time ramping up to the number of executors we already have, so // make sure our target is at least as much as our current allocation:numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)// Boost our target with the number to add for this round:numExecutorsTarget += numExecutorsToAdd// Ensure that our target doesn't exceed what we need at the present moment:numExecutorsTarget = math.min(numExecutorsTarget, maxNumExecutorsNeeded)// Ensure that our target fits within configured bounds:numExecutorsTarget = math.max(math.min(numExecutorsTarget, maxNumExecutors),
minNumExecutors)
此時executorIds的長度為0,集合是個空集合,這個時候numExecutorsToAdd的值為預設的1,根據上面的程式碼計算完成後(maxNumExecutorsNeeded為5就是tasks需要的executor的個數),numExecutorsTarget的值為1.接下來計算出來一個值,如果這次任務的目標executor的個數高於上次tasks啟動的目標executor的個數,delta的值是一個大於0的值.根據上面的說明,下面程式碼中這個delta的值為1,val delta = numExecutorsTarget - oldNumExecutorsTarget// If our target has not changed, do not send a message // to the cluster manager and reset our exponential growthif (delta == 0) {
如果delta等於0,表示這次的目標executor的個數,與上次任務的executor的個數相同,重置增量的個數為1.numExecutorsToAdd = 1return 0}接下來,通過下面的程式碼通過SparkContext發起numExecutorsTarget的executor的啟動,並在executor中載入對應的task的個數.
val addRequestAcknowledged = testing || client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks,
hostToLocalTaskCount)
接下來,由於我們的任務執行還需要的executor的個數還需要4個(共需要),同時這個時候,delta的值為1,與numExecutorsToAdd的屬性值相同,因此numExecutorsToAdd的值會*2.
numExecutorsToAdd = if (delta == numExecutorsToAdd) {numExecutorsToAdd * 2} else {1}
3,排程定時器開始執行第二次排程啟動,這個時候執行updateAndSyncNumExecutorsTarget函式時,numExecutorsTarget的值為1,需要的executor的個數為3,因此,還是會執行時間超時的流程.
else if (addTime != NOT_SET && now >= addTime) {val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " +s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")addTime += sustainedSchedulerBacklogTimeoutS * 1000delta}
再次進入addExecutors函式,這個時候numExecutorsToAdd屬性值為2,numExecutorsTarget屬性值為1,executorsIds的size為1,已經有一個executor被啟動,需要的executor的個數為3,最後計算完成後,numExecutorsTarget屬性的值為3.計算出來當前的numExecutorsTarget與上一次的numExecutorsTarget的delta的值為2,開始根據這個值為3的numExecutorsTarget發起task的啟動請求.
接下來,由於計算出來的delta的值為2,而屬性numExecutorsToAdd的值也為2,
因此numExecutorsToAdd屬性值現在還是需要*2操作.執行完成後,最後這個numExecutorsToAdd屬性值修改成了4.
4,這個時候,由於還有部分task沒有被執行,開始第三次的處理,此時,numExecutorTarget的值還是小於目標的executor的個數,接著執行addExecutors函式,此時,executorsIds的size為4,第一次執行一個,第二次啟動了3個,這個時候,numExecutorsTarget的屬性值變化情況:
首先先修改成4,(取numExecutorsTarget與executorIds.size中的最大值),
然後numExecutorsTarget += numExecutorsToAdd的值,這個時候值修改成了8.
最後與共需要的executor的個數5取最小值,把值修改成5.計算出當前的numExecutorsTarget與上一次的numExecutorsTarget的差值為2,numExecutorsToAdd的值為4,因此重新修改numExecutorsToAdd的值為1.
這個時候排程程式會修改addTime的值為NOT_SET,表示不在執行executor的排程.因為executor已經夠了.
5,現在假定spark.dynamicAllocation.initialExecutors配置項配置有一個值,初始值為6.需要的executor的個數還是是5.這個時候,進入updateAndSyncNumExecutorsTarget函式時,執行如下的流程部分,因為初始的executor的個數大於了需要的executor的個數.這部分流程在設定有初始大小的executor個數或者說要執行的job的第二個stage的task的個數需要的executor的個數小於小次stage需要的executor的個數時,會被執行.
else if (maxNeeded < numExecutorsTarget) {// The target number exceeds the number we actually need, so stop adding new // executors and inform the cluster manager to cancel the extra pending requestsval oldNumExecutorsTarget = numExecutorsTarget numExecutorsTarget = math.max(maxNeeded, minNumExecutors)numExecutorsToAdd = 1// If the new target has not changed, avoid sending a message to the cluster managerif (numExecutorsTarget < oldNumExecutorsTarget) { client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks,
hostToLocalTaskCount) logDebug(s"Lowering target number of executors to $numExecutorsTarget
(previously " +s"$oldNumExecutorsTarget) because not all requested executors are actually
needed") }numExecutorsTarget - oldNumExecutorsTarget}
在上面的程式碼中,重新根據需要的executor的個數,計算出numExecutorsTarget的值,這個時候,新的numExecutorsTarget的值為5,而老的numExecutorsTarget的值為6,因此通過新的numExecutorsTarget直接呼叫SparkContext中對應的啟動executor的函式,發起對executor的排程與task的啟動.
通過SparkContext排程executor
在allocationManager中,對executor進行動態的呼叫後,會執行如下的程式碼片斷.
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks,
hostToLocalTaskCount)
在上面的程式碼中,client就是SparkContext例項.
下面看看這個函式的處理流程:
函式的傳入引數中:
numExecutors是目標的executor的個數,
第二個是共需要的task的個數,
第三個是host->taskCount的集合.
private[spark] override def requestTotalExecutors( numExecutors: Int,localityAwareTasks: Int,hostToLocalTaskCount: scala.collection.immutable.Map[String, Int] ): Boolean = { schedulerBackend match {case b: CoarseGrainedSchedulerBackend =>
這裡直接呼叫了GoarseGrainedSchedulerBackend中對應的函式. b.requestTotalExecutors(numExecutors, localityAwareTasks,
hostToLocalTaskCount)case _ => logWarning("Requesting executors is only supported in coarse-grained mode")false}}
下面看看GoarseGrainedSchedulerBackend中requestTotalExecutors的函式實現:
final override def requestTotalExecutors( numExecutors: Int,localityAwareTasks: Int,hostToLocalTaskCount: Map[String, Int] ): Boolean = synchronized {if (numExecutors < 0) {throw new IllegalArgumentException("Attempted to request a negative number of executor(s) " +s"$numExecutors from the cluster manager. Please specify a positive number!") }this.localityAwareTasks = localityAwareTasksthis.hostToLocalTaskCount = hostToLocalTaskCount
每次執行時,計算出還需要的共需要的executor的個數與正在執行或者等待回收的executor的個數之間的差值,這個差值是還需要啟動的executor的個數.numPendingExecutors = math.max(numExecutors - numExistingExecutors + executorsPendingToRemove.size,
0)
這裡根據具體的cluster的部署模式(yarn,standalone,mesos,等),呼叫對應的函式進行executor的啟動操作.這裡我們看看standalone的操作.由SparkDeploySchedulerBackend實現.
這個函式的實現主要是通過向master傳送一個RequestExecutors訊息,這個訊息是一個需要響應的訊息.
這個訊息在Master中通過receiveAndReply函式中的RequestExecutors部分進行處理. doRequestTotalExecutors(numExecutors)}
Master中處理executor的申請:
caseRequestExecutors(appId, requestedTotal) =>context.reply(handleRequestExecutors(appId, requestedTotal))
看看這個的handleRequestExecutors函式
private def handleRequestExecutors(appId: String, requestedTotal: Int)
: Boolean = {idToApp.get(appId) match {
這個函式中,根據傳入的app對應的job共依賴的executor的個數,更新appInfo中executorLimit的值.並執行對executor的啟動的排程.case Some(appInfo) => logInfo(s"Application $appId requested to set total executors to
$requestedTotal.") appInfo.executorLimit = requestedTotal
在這個排程的過程中通過startExecutorsOnWorkers函式來排程與啟動executor在對應的worker中,
在判斷要啟動的executor的個數時,會根據scheduleExecutorsOnWorkers函式來判斷executor的個數是否達到要求的appInfo.executorLimit的個數,如果達到指定的executor的個數時,排程不再執行executor的啟動.判斷worker是否有足夠的資源啟動executor時,通過對executor需要的cpu core的個數與executor需要的記憶體來判斷worker是否有足夠的對應資源啟動executor,如果有,表示這個worker可以用來啟動executor,迭代所有的worker進行executor的啟動,當已經啟動的executor的個數達到了appInfo的executorLimit的限制時,不在進行分配. schedule()true case None => logWarning(s"Unknown application $appId requested
$requestedTotal total executors.")false}}
Driver端處理