Yarn方式下的ScheduleBackend是怎麼實現的?
Yarn方式下的ScheduleBackend是用的啥?
在SparkContext中建立ScheduleBackend時,會根據指定的”master“引數的字首決定建立哪種ScheduleBackend,對於"yarn://host:port"這樣的URL來說,如果是cluster模式,就是建立YarnClusterSchedulerBackend,如果是client模式,就是建立YarnClientSchedulerBackend。
我們還是先看看YarnClusterSchedulerBackend的程式碼結構把。
YarnClusterSchedulerBackend繼承了YarnSchedulerBackend,沒有太多的發揮程式碼,我們直接看YarnSchedulerBackend把。估計client模式下也差不多。
YarnSchedulerBackend又繼承了CoarseGrainedSchedulerBackend,我們看看不同點在哪裡。
覆寫了doRequestTotalExecutors和doKillExecutors方法,一個申請Executor,一個殺死Executor。
override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = { yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal)) } override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { yarnSchedulerEndpointRef.ask[Boolean](KillExecutors(executorIds)) }
yarnSchedulerEndpointRef就是同一個檔案裡的endpoint端,看看具體的執行程式碼是什麼:
case r: RequestExecutors => amEndpoint match { case Some(am) => am.ask[Boolean](r).andThen { case Success(b) => context.reply(b) case Failure(NonFatal(e)) => logError(s"Sending $r to AM was unsuccessful", e) context.sendFailure(e) }(ThreadUtils.sameThread) } case k: KillExecutors => amEndpoint match { case Some(am) => am.ask[Boolean](k).andThen { case Success(b) => context.reply(b) case Failure(NonFatal(e)) => logError(s"Sending $k to AM was unsuccessful", e) context.sendFailure(e) }(ThreadUtils.sameThread) }
我們看到它又將訊息轉給了amEndpoint,就是轉給了yarn工程裡的ApplicationManager。又要跳到ApplicationManager去看看裡面的實現邏輯了,真是一波三折啊。
ApplicationManager裡是怎麼處理RequestExecutors和KillExecutors兩個訊息的呢?
case r: RequestExecutors =>
Option(allocator) match {
case Some(a) =>
if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) {
resetAllocatorInterval()
}
context.reply(true)
}
case KillExecutors(executorIds) =>
Option(allocator) match {
case Some(a) => executorIds.foreach(a.killExecutor)
}
context.reply(true)
呼叫allocator的killExecutor和requestTotalExecutorsWithPreferredLocalities方法。allocator又是啥?這裡是不是類有的太多了啊。。
allocator = client.createAllocator(
yarnConf,
_sparkConf,
appAttemptId,
driverUrl,
driverRef,
securityMgr,
localResources)
是client的createAllocator方法創建出來的,client是啥?是YarnRMClient,我們就要先看看YarnRMClient了,看名字就大概能猜到,YarnRMClient就是來向Yarn機器申請Executor和殺死Executor的。
createAllocator方法返回下面的YarnAllocator:
return new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, appAttemptId, securityMgr,
localResources, SparkRackResolver.get(conf))
來到YarnAllocator。
YarnAllocator的killExecutor方法很好理解,就是釋放Yarn中的Container:
def killExecutor(executorId: String): Unit = synchronized {
executorIdToContainer.get(executorId) match {
case Some(container) if !releasedContainers.contains(container.getId) =>
internalReleaseContainer(container)
runningExecutors.remove(executorId)
case _ => logWarning(s"Attempted to kill unknown executor $executorId!")
}
}
申請Executor其實最終是在runAllocatedContainers方法中實現的。
核心程式碼看一下把,完整的可以看原始碼:
if (runningExecutors.size() < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
launcherPool.execute(() => {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
updateInternalState()
} catch {
}
})
}
申請targetNumExecutors個ExecutorRunner,這樣就和Standalone的申請Executor對應起來了。好了,整個過程就是這樣了。
最終就會在Yarn叢集中申請了所需數目的Container,並且在Container中啟動ExecutorRunner,來向Driver彙報成績。
這裡的ExecutorRunner就是YarnCoarseGrainedExecutorBackend執行緒,在ExecutorRunner類中可以看到。
多看幾遍原始碼吧,當你真正看懂了,你會感覺