spark執行時的訊息通訊原始碼閱讀(二)
概要
(spark 版本為2.1.1)
應用程式(Application): 基於Spark的使用者程式,包含了一個Driver Program 和叢集中多個的Executor;
驅動程式(Driver Program):執行Application的main()函式並且建立SparkContext,通常用SparkContext代表Driver Program;
執行單元(Executor): 是為某Application執行在Worker Node上的一個程序,該程序負責執行Task,並且負責將資料存在記憶體或者磁碟上,每個Application都有各自獨立的Executors;
叢集管理程式(Cluster Manager): 在叢集上獲取資源的外部服務(例如:Standalone、Mesos或Yarn);
操作(Operation):作用於RDD的各種操作分為Transformation和Action;
角色
可以將spark的執行過程分為三塊,以standalone為例:
1、客戶端; 2、Master 3、Worker
再細分一下
1、客戶端可以細化為: Driver、sparkContext
2、Master就是Master
3、worker可以細分為:Executor
再細分一下:
1、客戶端中的sparkContext可以細分為:DAGScheduler、TaskScheduler
2、Master還是Master
3、Worker可以細分為:執行緒池、TaskRunner
畫個圖就是這麼個意思:
具體過程使用者提交應用程式時:
1、提交Spark任務,spark-submit提交application
2、使用spark-submit使用Standalone時會建立和構造一個DriverActor程序。
3、Driver執行編寫的程式碼,執行到在main函式中建立sparkContext,構建Spark Application的執行環境。
4、SparkContext(物件),在初始化的時候,做的最重要的兩件事情,就是構造出來DAGScheduler和TaskScheduler。
5、TaskScheduler(有自己的後臺程序),實際上負責,通過它對應的一個後臺程序,去連線Master,向Master註冊Application。
6、Master,接收到Application註冊的請求之後,Master會給Client返回一個註冊結果,Client將該Application標註為已註冊,並去連線Worker,會使用自己的資源排程演算法,在spark叢集的多個Worker上,為這個Application啟動多個Executor(StandaloneExecutorBackend)。
7、Master通知Worker啟動Executor。
8、Worker會為Applicator啟動Executor。
9、Executor(程序),啟動之後,會自己反向註冊到這個Application對應的這個SparkContext裡面的的TaskScheduler上去,這時TaskScheduler就知道自己服務於當前這個Application應用的Executor有哪些了,除此以外,Executor會向Master傳送心跳資訊,並申請Task(??????)。
10、所有Executor都反向註冊到Driver上之後,Driver結束SparkContext初始化,會繼續執行我們自己編寫的程式碼。
11、每執行到一個action,就會建立一個job。
12、job,會提交給DAGScheduler。
13、DAGScheduler,會將job劃分為多個stage,然後每個stage建立一個TaskSet。(stage,stage劃分演算法)。
14、每個TaskSet會提交給TaskScheduler。
15、TaskScheduler,會把TaskSet裡每一個task提交到executor上執行。所以,之前哪些executor是註冊到這個TaskScheduler上面來,那麼TaskScheduler在接收到TaskSet的時候,就會把Task提交到那些executor上面去。(task分配演算法)
16、Executor(程序),有一個執行緒池,每接收到一個task,都會用TaskRunner來封裝task,然後從執行緒池裡取出一個執行緒,執行這個task。
17、TaskRunner,將我們編寫的程式碼,也就是要執行的運算元以及函式,拷貝,反序列化,然後執行task。(Task,有兩種,ShuffleMapTask和ResultTask,只有最後一個stage是ResultTask,之前的stage都是ShuffleMapTask)。
18、所以,最後整個spark應用程式的執行,就是stage分批次作為taskset提交到executor執行,每個task針對RDD的一個partition,執行我們定義的運算元和函式,這些task在執行完對初始的RDD的運算元和函式之後,會產生一個新的RDD,這批task如果在一個stage裡面,他會繼續執行我們對第二個RDD定義的運算元和函式,然後以此類推,這個stage執行完以後會執行下一個stage,到job,直到所有操作執行完為止。
上述過程可簡化為以下過程:
1、在main方法中初始化SparkContext,SparkContext(客戶端)會向Master(也可以說是資源管理器)傳送應用註冊訊息,並申請執行Executor資源(此處是standalone環境,如果是onYarn就是ResourceManager),Master會給Client返回一個註冊結果,Client將該Application標註為已註冊。
2、Master根據應用的資源,給選擇Worker分配Executor資源並啟動StandaloneExecutorBackend;
3、啟動Executor後,Executor會向SparkContext(客戶端)傳送註冊成功資訊,同時將執行情況將隨著心跳傳送到Master上,並申請Task;
4、當SparkContext的RDD觸發行動操作後,將建立RDD的DAG,通過DAGSchedule進行劃分stage轉化為TaskSet,並把Taskset傳送給Task Scheduler;
5、Task Scheduler將Task傳送給註冊的Executor執行,同時SparkContext將應用程式程式碼傳送給Executor,Excutor接收到任務訊息後,啟動並執行任務(也就是說任務是在Excutor中執行);
6、最後當所有任務執行時,有Driver處理結果並回收資源。(Driver來申請資源和回收資源)
程式碼流程圖如下:
執行流程圖如下:
Spark執行架構特點:
- 每個Application獲取專屬的executor進 程,該程序在Application期間一直駐留,並以多執行緒方式執行tasks。這種Application隔離機制有其優勢的,無論是從排程角度看 (每個Driver排程它自己的任務),還是從執行角度看(來自不同Application的Task執行在不同的JVM中)。當然,這也意味著 Spark Application不能跨應用程式共享資料,除非將資料寫入到外部儲存系統。
- Spark與資源管理器無關,只要能夠獲取executor程序,並能保持相互通訊就可以了。
- 提 交SparkContext的Client應該靠近Worker節點(執行Executor的節點),最好是在同一個Rack裡,因為Spark Application執行過程中SparkContext和Executor之間有大量的資訊交換;如果想在遠端叢集中執行,最好使用RPC將 SparkContext提交給叢集,不要遠離Worker執行SparkContext。
- Task採用了資料本地性和推測執行的優化機制。
程式碼流程:
a) 建立client向master註冊Application的註冊執行緒池
類:StandaloneAppClient在ClientEndpoint(ClientEndpoint為StandaloneAppClient的私有類)的tryRegisterAllMaster方法中建立註冊執行緒池registerMasterThreadPool,在該執行緒池中啟動註冊執行緒並向Master傳送RegisterApplication註冊應用的訊息,程式碼如下所示:
類:
/**
* Register with all masters asynchronously and returns an array `Future`s for cancellation.
*/
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
// 由於HA等環節中有多個Master,需要遍歷所有Master傳送訊息
for (masterAddress <- masterRpcAddresses) yield {
//向執行緒池中啟動註冊執行緒,當該執行緒讀到應用註冊成功標識registered=true時退出註冊執行緒
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
//獲取Master端的引用,傳送註冊應用訊息
val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self))
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
b)Master接收到Application註冊資訊,完成註冊並返回Client,同時向Worker傳送啟動Executor的請求
Master 接收到註冊應用的訊息時,在registerApplication方法中記錄應用
資訊並把該應用加入到等待執行應用列表中,註冊完畢後傳送成功訊息RegisterApplication給ClientEndpoint,同時呼叫startExecutorsOnWorkers方法執行應用。在執行前需要獲取執行應用的Worker,然後傳送LaunchExcutor訊息給Worker,通知Worker啟動Excutor,其中Master.startExcutorsOnWorkers方法如下:
/**
* Schedule and launch executors on workers
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
//從app列表中使用FIFO排程演算法執行應用,即先註冊的應用先執行。
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
//在worker列表中,根據worker狀態和資源資訊過濾出需要執行應用的worker
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
//確定執行在哪些Worker上和每個Worker分類用於執行的核數,分配演算法有兩種,一種是把應用執行//在儘可能多的Worker上,另一種是執行在儘可能 少的Worker上
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
//傳送LaunchExecutor訊息給Worker,通知Worker啟動Executor。
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
c) client 接收到Master返回的註冊成功資訊,完成註冊Application
AppClient.ClientEndpoint接收到Master傳送的RegisterApplication訊息,需要把登錄檔示registered,置為true(表示已註冊),Master註冊執行緒獲取狀態變化後,完成註冊Application程序,StandaloneAppClient.RegisteredApplication程式碼如下:
override def receive: PartialFunction[Any, Unit] = {
case RegisteredApplication(appId_, masterRef) =>
// FIXME How to handle the following cases?
// 1. A master receives multiple registrations and sends back multiple
// RegisteredApplications due to an unstable network.
// 2. Receive multiple RegisteredApplication from different masters because the master is
// changing.
appId.set(appId_)
registered.set(true)
master = Some(masterRef)
listener.connected(appId.get)
d) Worker的啟動Executor的過程
在b)步驟中,在Master類的startExecutorsOnWorkers方法中分配資源執行應用程式時,呼叫allocateWorkerResourceToExecutors方法實現在Worker中啟動Executor。當Worker收到Master傳送過來的LaunchExecutor訊息後,先例項化ExecutorRunner物件,在ExecutorRunner啟動中,會建立程序生成器 ProcessBuilder,然後由該生成器使用command建立CoarseGrainedExecutorBackend物件,該物件是Executor執行的容器,最後Worker傳送ExecutorStateChanged訊息給Master,通知Executor已經建立完畢。
當Worker接收到啟動Executor訊息,執行程式碼如下:
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
// 建立 Executor 執行目錄
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// Create local dirs for the executor. These are passed to the executor via the
// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
// application finishes.(通過SPARK_EXECUTOR_DIRS 環境變數,在worker中建立Executor執行目錄,當程式執行完畢後由worker進行刪除)
val appLocalDirs = appDirectories.getOrElse(appId, {
val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
val dirs = localRootDirs.flatMap { dir =>
try {
//建立執行目錄
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
//授權
Utils.chmod700(appDir)
Some(appDir.getAbsolutePath())
} catch {
case e: IOException =>
logWarning(s"${e.getMessage}. Ignoring this directory.")
None
}
}.toSeq
if (dirs.isEmpty) {
throw new IOException("No subfolder can be created in " +
s"${localRootDirs.mkString(",")}.")
}
dirs
})
appDirectories(appId) = appLocalDirs
//在ExecutorRunner中建立CoarseGrainedExecutorBackend物件,建立的是使用應用資訊中的//command,而command是在SparkDeploySchedulerBackbend的start方法中構建
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
//向master傳送訊息,表示Executoor狀態已經更改為ExecutorState.RUNNING
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
}
}
在ExecutorRunner建立中呼叫了fetchAndRunExecutor方法進行實現,在該方法中command內容在SparkDeploySchedulerBackend中定義,指定構造Executor執行容器CoarseGrainedExecutorBackend,其建立過程如下所示:ExecutorRunner.fetchAndRunExecutor()
private def fetchAndRunExecutor() {
try {
// Launch the process
// 通過應用程式資訊和環境配置建立構造器builder
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")
// 在構造器builder中新增執行目錄等資訊
builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
//在構造器builder中新增監控頁面輸入日誌地址資訊
// Add webUI log urls
val baseUrl =
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
} else {
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
}
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
//啟動構造器,建立CoarseGrainedExecutorBackkend例項
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
//輸出CoarseGrainedExecutorBackkend例項的執行資訊
// Redirect its stdout and stderr to files (正確資訊)
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
//錯誤資訊
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// 等待CoarseGrainedExecutorBackkend執行結算書,當結束時向Worker傳送退出狀態資訊
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
case e: Exception =>
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}
e) Master接收到Worker傳送的啟動Executor完成的資訊
Master接收到Worker傳送的ExecutorStateChange訊息,根據ExecutorState。
類Master
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
execOption match {
case Some(exec) =>
val appInfo = idToApp(appId)
val oldState = exec.state
exec.state = state
if (state == ExecutorState.RUNNING) {
assert(oldState == ExecutorState.LAUNCHING,
s"executor $execId state transfer from $oldState to RUNNING is illegal")
appInfo.resetRetryCount()
}
// 向driver 傳送ExecutorUpdated訊息
exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
// If an application has already finished, preserve its
// state to display its information properly on the UI
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
exec.worker.removeExecutor(exec)
val normalExit = exitStatus == Some(0)
// Only retry certain number of times so we don't go into an infinite loop.
// Important note: this code path is not exercised by tests, so be very careful when
// changing this `if` condition.
if (!normalExit
&& appInfo.incrementRetryCount() >= MAX_EXECUTOR_RETRIES
&& MAX_EXECUTOR_RETRIES >= 0) { // < 0 disables this application-killing path
val execs = appInfo.executors.values
if (!execs.exists(_.state == ExecutorState.RUNNING)) {
logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
s"${appInfo.retryCount} times; removing it")
removeApplication(appInfo, ApplicationState.FAILED)
}
}
}
schedule()
case None =>
logWarning(s"Got status update for unknown executor $appId/$execId")
}
f) Executor啟動後,會將Executor資訊傳送給Driver,Driver會返回確認訊息,併發送LaunchTask訊息執行任務。
在CoarseGrainedExecutorBackkend啟動方法onStart中,會發送註冊Executor訊息RegisterExecutor給DriverEndPoint,在Driver端,先判斷Executor是否已經註冊,如果已經存在則傳送註冊失敗RegisterExecutorFailed訊息,否則Driver會記錄該Executor資訊,傳送註冊成功RegisterExecutor訊息,在makeOffers()方法中分配執行任務資源,最後傳送LaunchTask訊息執行任務。
其中在Driver端進行註冊的Executor的過程如下:
類:CoarseGrainedSchedulerBackend
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
//記錄Executor
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
//建立Executor編號和其具體資訊的鍵值列表
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
//回覆Executor 完成註冊訊息並在監聽匯流排中加入Executor事件
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
//分配執行任務資源併發送LaunchTask訊息執行任務
makeOffers()
}
g) Executor接收到自己註冊成功的訊息後,會向Driver傳送心跳,並等待任務
當CoarseGrainedExecutorBackend接收到Executor註冊成功RegisterExecutor訊息時,在CoarseGrainedExecutorBackend 容器是例項化Executor物件。啟動完畢後,會向Driver定時傳送心跳資訊,等待接收從Driver端傳送執行任務的訊息
類CoarseGrainedExecutorBackend
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
//根據環境變數的引數啟動Executor,在spark中塔是真正任務的執行者
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
在 new Executor 該類中,定時向Driver傳送心跳資訊,等待Driver下發任務:
// Executor for the heartbeat task.
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
/**
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
*/
private def startDriverHeartbeater(): Unit = {
//設定間隔時間為10s
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
//等待隨機的時間間隔,這樣心跳在同步中不會結束
// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
//傳送心跳資訊給Driver
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}
}
h) 執行任務的過程
CoarseGrainedExecutorBackend的Executor啟動後,接收從Driver端傳送LaunchTask執行任務訊息,任務執行是在Executor的launchTask方法實現的。在執行時會建立TaskRunner程序,由該程序進行任務的處理,處理完畢後傳送statusUpdate訊息返回給CoarseGrainedExecutorBackend
類CoarseGrainedExecutorBackend通過Executor啟動launchTask:
case LaunchTask(data) =>
if (executor == null) {
// 當executor沒有成功啟動時,輸出異常日誌並關閉
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
//啟動TaskRunner程序執行任務
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
呼叫Executor的launchTask方法,在該方法中建立TaskRunner程序,然後把該程序加入到threadPool中,由Executor進行統一排程:
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
任務執行過程和獲取執行結果。
i) 執行完任務後的過程
在TaskRunner執行任務完成時,會由向Driver端傳送狀態變更訊息,當Driver接收到該訊息時,呼叫TaskSchedulerImpl的statusUpdate方法,根據任務執行不同的結果進行處理,處理完畢後再給該Executor分配執行任務,其中,在Driver端處理狀態變更程式碼如下:
類 CoarseGrainedSchedulerBackend
case StatusUpdate(executorId, taskId, state, data) =>
//呼叫TaskSchedulerImpl的statusUpdate()方法,根據任務執行不同的結果進行處理
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
//任務執行成功後,回收該Executor執行該 任務的cpu,再根據實際情況分配任務。
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
仔細看以上程式碼,其實就可以看出Driver端的方法就是CoarseGrainedSchedulerBackend類的方法
程式碼類和方法的執行流程: