spark資源排程流程總結
初學spark在Standalone模式下的資源排程機制,發現學習原始碼是理解spark一切機制的根本。現在對相關spark2.1.0原始碼的學習做個梳理。
一 應用程式提交時Master中對Driver和Executor的啟動控制和資源分配機制。
首先進入Master.scala中檢視Master類,資源排程流程學習從receive方法的case RegisterApplicaiton(作業提交時的註冊處理)開始。
case RegisterApplication(description, driver) =>// TODO Prevent repeated registrations from some driver
//1. 只有處理Active狀態下Master才處理作業提交的註冊!// ignore, don't send response} else { logInfo("Registering app " + description.name)val app = createApplication(description, driver) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id)
//2.persistenceEngine為持久化引擎!persistenceEngine.addApplication(app)
//3.個人理解此時還未啟動driver,這個傳送不是往driver傳送註冊訊息。如何理解,後續補充 driver.send(RegisteredApplication(app.id, self))
//4. 資源排程的核心:scheduler()schedule() }
接下來繼續學習schedue()的原始碼。首先看到的Master對Driver啟動的控制(Master在此方法中決定在哪臺Worker上啟動Driver)。
** * Schedule the currently available resources among waiting apps. This method will be called
* !!!Scheduler呼叫時機:每次有新的應用程式加入或者叢集資源變化!!! */private def schedule(): Unit = {
//1. 當前Master的狀態必須是Alive,才會進行資源排程(standby狀態的Master不進行資源排程)if (state != RecoveryState.ALIVE) {return}// Drivers take strict precedence over executorsval shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0
//3.waitingDrivers是型別為DriverInfo的陣列(private val waitingDrivers = new ArrayBuffer[DriverInfo]),每個DriverInfo的DriverDescription中有要啟動Driver時對Worker的記憶體和cores等要求的內容。for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers.var launched = false var numWorkersVisited = 0
//在符合資源要求的情況下采用隨機打亂後的一個Worker來啟動Driver(launchDriver(worker,driver))。找到一個即將等待佇列中的條目減一,然後退出迴圈!!!while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver)waitingDrivers -= driver launched = true} curPos = (curPos + 1) % numWorkersAlive } }
startExecutorsOnWorkers()}
深入學習startExecutorOnWorkers()方法來學習Executor的啟動過程。(Master在此方法中按剩餘資源的順序決定在哪臺Worker上啟動executor以及為該executor分配的資源)
/** * Schedule and launch executors on workers
* 在Worker基礎上排程和啟動executor */private def startExecutorsOnWorkers(): Unit = {// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc.
// 1.spark預設為應用程式啟動executor的方式為FIFO。所有提交的應用程式放在排程的等待佇列中//而且是先入先出的。只有滿足了前面的應用程式所需資源的基礎上才能為下一個應用程式分配資源!!
//2. private[master] def coresLeft: Int = requestedCores - coresGranted理解為:只有系統資源//cores還有剩餘時才會在Worker上排程和啟動Exectuorfor (app <- waitingApps if app.coresLeft > 0) {
//3. 獲取Executor需要的core資源val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor// Filter out workers that don't have enough resources to launch an executorval usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && worker.coresFree >= coresPerExecutor.getOrElse(1)) .sortBy(_.coresFree).reverse//4.sortBy(_.coresFree)理解為:按Worker的資源大小進行排序
//scheduleExecutorsOnWorkers決定了在哪臺Worker上分配多少個
//executour以及為每個executor分配的core數!!!val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) { allocateWorkerResourceToExecutors( app, assignedCores(pos), coresPerExecutor, usableWorkers(pos)) } }}
二 Worker中啟動Driver和啟動ExecutorBackend的流程
Worker中啟動Driver和ExecutorBackends程序分別都是建立一個Thread來啟動的。具體原始碼可通過Worker類的receive方法中的case launchDriver和case launchExecutor來學習。
下圖為Worker啟動這兩個程序的簡要流程。
三 Executor的啟動機制
CoarseGrainedExecutorBackend作為一個訊息通訊體(具體實現了ThreadSafeRPCEndpoint和ExecutorBackend),可以傳送訊息給Driver並可以接收Driver中傳送過來的訊息,如啟動Task。
private[spark] class CoarseGrainedExecutorBackend(override val rpcEnv: RpcEnv,driverUrl: String,executorId: String,hostname: String,cores: Int,userClassPath: Seq[URL],env: SparkEnv)extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging
onStart方法中註冊ExecutorBackend例項,receive方法的case RegisterExecutor中建立了Executor例項。
override def receive: PartialFunction[Any, Unit] = {case RegisteredExecutor => logInfo("Successfully registered with driver")try {
//executor與CoraseGrainedExecutorBackend是一一對應的!!executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch {case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) }
Driver程序中有連個至關重要的Endpoint:ClientEndpoint: 主要負責向Master註冊當前的程式。DriverEndpoint: 主要負責程式執行時的驅動器。ExecutorData為Executor資料封裝。
/** Grouping of data for an executor used by CoarseGrainedSchedulerBackend.*/
private[cluster] class ExecutorData(val executorEndpoint: RpcEndpointRef,val executorAddress: RpcAddress,override val executorHost: String,var freeCores: Int,override val totalCores: Int,override val logUrlMap: Map[String, String]) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
private val executorDataMap = new HashMap[String, ExecutorData]
CoarseGrainedSchedulerBackend的receiveAndReply方法中case RegisterExecutor完成具體的註冊過程!!!
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>if (executorDataMap.contains(executorId)) { executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId)) context.reply(true) } else {// If the executor's rpc env is not listening for incoming connections, `hostPort` // will be null, and the client connection should be used to contact the executor.val executorAddress = if (executorRef.address != null) { executorRef.address } else { context.senderAddress }
//更新executorBackend的註冊結果!!! logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")addressToExecutorId(executorAddress) = executorIdtotalCoreCount.addAndGet(cores)totalRegisteredExecutors.addAndGet(1)val data = new ExecutorData(executorRef, executorRef.address, hostname,cores, cores, logUrls)// This must be synchronized because variables mutated // in this block are read when requesting executors
// 此處synchronized 關鍵字理解:Driver會同時接收很多的Executor註冊請求!CoarseGrainedSchedulerBackend.this.synchronized {executorDataMap.put(executorId, data)if (currentExecutorIdCounter <