Spark原始碼系列(七)Spark on yarn具體實現
本來不打算寫的了,但是真的是閒來無事,整天看美劇也沒啥意思。這一章打算講一下Spark on yarn的實現,1.0.0裡面已經是一個stable的版本了,可是1.0.1也出來了,離1.0.0釋出才一個月的時間,更新太快了,節奏跟不上啊,這裡仍舊是講1.0.0的程式碼,所以各位朋友也不要再問我講的是哪個版本,目前為止釋出的文章都是基於1.0.0的程式碼。
在第一章《spark-submit提交作業過程》的時候,我們講過Spark on yarn的在cluster模式下它的main class是org.apache.spark.deploy.yarn.Client。okay,這個就是我們的頭號目標。
提交作業
找到main函式,裡面呼叫了run方法,我們直接看run方法。
val appId = runApp()
monitorApplication(appId)
System.exit(0)
執行App,跟蹤App,最後退出。我們先看runApp吧。
def runApp(): ApplicationId = { // 校驗引數,記憶體不能小於384Mb,Executor的數量不能少於1個。 validateArgs() // 這兩個是父類的方法,初始化並且啟動Client init(yarnConf) start() // 記錄叢集的資訊(e.g, NodeManagers的數量,佇列的資訊). logClusterResourceDetails() // 準備提交請求到ResourcManager (specifically its ApplicationsManager (ASM)// Get a new client application. val newApp = super.createApplication() val newAppResponse = newApp.getNewApplicationResponse() val appId = newAppResponse.getApplicationId() // 檢查叢集的記憶體是否滿足當前的作業需求 verifyClusterResources(newAppResponse) // 準備資源和環境變數. //1.獲得工作目錄的具體地址: /.sparkStaging/appId/ val appStagingDir = getAppStagingDir(appId) //2.建立工作目錄,設定工作目錄許可權,上傳執行時所需要的jar包 val localResources = prepareLocalResources(appStagingDir) //3.設定執行時需要的環境變數 val launchEnv = setupLaunchEnv(localResources, appStagingDir) //4.設定執行時JVM引數,設定SPARK_USE_CONC_INCR_GC為true的話,就使用CMS的垃圾回收機制 val amContainer = createContainerLaunchContext(newAppResponse, localResources, launchEnv) // 設定application submission context. val appContext = newApp.getApplicationSubmissionContext() appContext.setApplicationName(args.appName) appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(amContainer) appContext.setApplicationType("SPARK") // 設定ApplicationMaster的記憶體,Resource是表示資源的類,目前有CPU和記憶體兩種. val memoryResource = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] memoryResource.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) appContext.setResource(memoryResource) // 提交Application. submitApp(appContext) appId }
monitorApplication就不說了,不停的呼叫getApplicationReport方法獲得最新的Report,然後呼叫getYarnApplicationState獲取當前狀態,如果狀態為FINISHED、FAILED、KILLED就退出。
說到這裡,順便把跟yarn相關的引數也貼出來一下,大家一看就清楚了。
while (!args.isEmpty) { args match { case ("--jar") :: value :: tail => userJar = value args = tail case ("--class") :: value :: tail => userClass = value args = tail case ("--args" | "--arg") :: value :: tail => if (args(0) == "--args") { println("--args is deprecated. Use --arg instead.") } userArgsBuffer += value args = tail case ("--master-class" | "--am-class") :: value :: tail => if (args(0) == "--master-class") { println("--master-class is deprecated. Use --am-class instead.") } amClass = value args = tail case ("--master-memory" | "--driver-memory") :: MemoryParam(value) :: tail => if (args(0) == "--master-memory") { println("--master-memory is deprecated. Use --driver-memory instead.") } amMemory = value args = tail case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail => if (args(0) == "--num-workers") { println("--num-workers is deprecated. Use --num-executors instead.") } numExecutors = value args = tail case ("--worker-memory" | "--executor-memory") :: MemoryParam(value) :: tail => if (args(0) == "--worker-memory") { println("--worker-memory is deprecated. Use --executor-memory instead.") } executorMemory = value args = tail case ("--worker-cores" | "--executor-cores") :: IntParam(value) :: tail => if (args(0) == "--worker-cores") { println("--worker-cores is deprecated. Use --executor-cores instead.") } executorCores = value args = tail case ("--queue") :: value :: tail => amQueue = value args = tail case ("--name") :: value :: tail => appName = value args = tail case ("--addJars") :: value :: tail => addJars = value args = tail case ("--files") :: value :: tail => files = value args = tail case ("--archives") :: value :: tail => archives = value args = tail case Nil => if (userClass == null) { printUsageAndExit(1) } case _ => printUsageAndExit(1, args) } }
ApplicationMaster
直接看run方法就可以了,main函式就幹了那麼一件事...
def run() {
// 設定本地目錄,預設是先使用yarn的YARN_LOCAL_DIRS目錄,再到LOCAL_DIRS
System.setProperty("spark.local.dir", getLocalDirs())
// set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
System.setProperty("spark.ui.port", "0")
// when running the AM, the Spark master is always "yarn-cluster"
System.setProperty("spark.master", "yarn-cluster")
// 設定優先順序為30,和mapreduce的優先順序一樣。它比HDFS的優先順序高,因為它的操作是清理該作業在hdfs上面的Staging目錄
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
appAttemptId = getApplicationAttemptId()
// 通過yarn.resourcemanager.am.max-attempts來設定,預設是2
// 目前發現它只在清理Staging目錄的時候用
isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
amClient = AMRMClient.createAMRMClient()
amClient.init(yarnConf)
amClient.start()
// setup AmIpFilter for the SparkUI - do this before we start the UI
// 方法的介紹說是yarn用來保護ui介面的,我感覺是設定ip代理的
addAmIpFilter()
// 註冊ApplicationMaster到內部的列表裡
ApplicationMaster.register(this)
// 安全認證相關的東西,預設是不開啟的,省得給自己找事
val securityMgr = new SecurityManager(sparkConf)
// 啟動driver程式
userThread = startUserClass()
// 等待SparkContext被例項化,主要是等待spark.driver.port property被使用
// 等待結束之後,例項化一個YarnAllocationHandler
waitForSparkContextInitialized()
// Do this after Spark master is up and SparkContext is created so that we can register UI Url.
// 向yarn註冊當前的ApplicationMaster, 這個時候isFinished不能為true,是true就說明程式失敗了
synchronized {
if (!isFinished) {
registerApplicationMaster()
registered = true
}
}
// 申請Container來啟動Executor
allocateExecutors()
// 等待程式執行結束
userThread.join()
System.exit(0)
}
run方法裡面主要乾了5項工作:
1、初始化工作
2、啟動driver程式
3、註冊ApplicationMaster
4、分配Executors
5、等待程式執行結束
我們重點看分配Executor方法。
private def allocateExecutors() {
try {
logInfo("Allocating " + args.numExecutors + " executors.")
// 分host、rack、任意機器三種類型向ResourceManager提交ContainerRequest
// 請求的Container數量可能大於需要的數量
yarnAllocator.addResourceRequests(args.numExecutors)
// Exits the loop if the user thread exits.
while (yarnAllocator.getNumExecutorsRunning < args.numExecutors && userThread.isAlive) {
if (yarnAllocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finishApplicationMaster(FinalApplicationStatus.FAILED, "max number of executor failures reached")
}
// 把請求回來的資源進行分配,並釋放掉多餘的資源
yarnAllocator.allocateResources()
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
} finally {
// In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
// so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
}
logInfo("All executors have launched.")
// 啟動一個執行緒來狀態報告
if (userThread.isAlive) {
// Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// we want to be reasonably responsive without causing too many requests to RM.
val schedulerInterval = sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
// must be <= timeoutInterval / 2.
val interval = math.min(timeoutInterval / 2, schedulerInterval)
launchReporterThread(interval)
}
}
這裡面我們只需要看addResourceRequests和allocateResources方法即可。
先說addResourceRequests方法,程式碼就不貼了。
Client向ResourceManager提交Container的請求,分三種類型:優先選擇機器、同一個rack的機器、任意機器。
優先選擇機器是在RDD裡面的getPreferredLocations獲得的機器位置,如果沒有優先選擇機器,也就沒有同一個rack之說了,可以是任意機器。
下面我們接著看allocateResources方法。
def allocateResources() {
// We have already set the container request. Poll the ResourceManager for a response.
// This doubles as a heartbeat if there are no pending container requests.
// 之前已經提交過Container請求了,現在只需要獲取response即可
val progressIndicator = 0.1f
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
if (numPendingAllocateNow < 0) {
numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
}
val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
for (container <- allocatedContainers) {
// 記憶體 > Executor所需記憶體 + 384
if (isResourceConstraintSatisfied(container)) {
// 把container收入名冊當中,等待發落
val host = container.getNodeId.getHost
val containersForHost = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
containersForHost += container
} else {
// 記憶體不夠,釋放掉它
releaseContainer(container)
}
}
// 找到合適的container來使用.
val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
// 遍歷所有的host
for (candidateHost <- hostToContainers.keySet) {
val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
val remainingContainersOpt = hostToContainers.get(candidateHost)
var remainingContainers = remainingContainersOpt.get
if (requiredHostCount >= remainingContainers.size) {
// 需要的比現有的多,把符合資料本地性的新增到dataLocalContainers對映關係裡
dataLocalContainers.put(candidateHost, remainingContainers)
// 沒有containner剩下的.
remainingContainers = null
} else if (requiredHostCount > 0) {
// 獲得的container比所需要的多,把多餘的釋放掉
val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount)
dataLocalContainers.put(candidateHost, dataLocal)
for (container <- remaining) releaseContainer(container)
remainingContainers = null
}
// 資料所在機器已經分配滿任務了,只能在同一個rack裡面挑選了
if (remainingContainers != null) {
val rack = YarnAllocationHandler.lookupRack(conf, candidateHost)
if (rack != null) {
val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
rackLocalContainers.getOrElse(rack, List()).size
if (requiredRackCount >= remainingContainers.size) {
// Add all remaining containers to to `dataLocalContainers`.
dataLocalContainers.put(rack, remainingContainers)
remainingContainers = null
} else if (requiredRackCount > 0) {
// Container list has more containers that we need for data locality.
val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount)
val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]())
existingRackLocal ++= rackLocal
remainingContainers = remaining
}
}
}
if (remainingContainers != null) {
// 還是不夠,只能放到別的rack的機器上運行了
offRackContainers.put(candidateHost, remainingContainers)
}
}
// 按照資料所在機器、同一個rack、任意機器來排序
val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
// 遍歷選擇了的Container,為每個Container啟動一個ExecutorRunnable執行緒專門負責給它傳送命令
for (container <- allocatedContainersToProcess) {
val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
// 記憶體需要大於Executor的記憶體 + 384
val executorMemoryOverhead = (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
if (numExecutorsRunningNow > maxExecutors) {
// 正在執行的比需要的多了,釋放掉多餘的Container
releaseContainer(container)
numExecutorsRunning.decrementAndGet()
} else {
val executorId = executorIdCounter.incrementAndGet().toString
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
sparkConf.get("spark.driver.host"),
sparkConf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
// To be safe, remove the container from `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
// 把container記錄到已分配的rack的對映關係當中
val rack = YarnAllocationHandler.lookupRack(conf, executorHostname)
allocatedHostToContainersMap.synchronized {
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId]())
containerSet += containerId
allocatedContainerToHostMap.put(containerId, executorHostname)
if (rack != null) {
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
// 啟動一個執行緒給它進行跟蹤服務,給它傳送執行Executor的命令
val executorRunnable = new ExecutorRunnable(
container,
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores)
new Thread(executorRunnable).start()
}
}
}
1、把從ResourceManager中獲得的Container進行選擇,選擇順序是按照前面的介紹的三種類別依次進行,優先選擇機器 > 同一個rack的機器 > 任意機器。
2、選擇了Container之後,給每一個Container都啟動一個ExecutorRunner一對一貼身服務,給它傳送執行CoarseGrainedExecutorBackend的命令。
3、ExecutorRunner通過NMClient來向NodeManager傳送請求。
總結:
把作業釋出到yarn上面去執行這塊涉及到的類不多,主要是涉及到Client、ApplicationMaster、YarnAllocationHandler、ExecutorRunner這四個類。
1、Client作為Yarn的客戶端,負責向Yarn傳送啟動ApplicationMaster的命令。
2、ApplicationMaster就像專案經理一樣負責整個專案所需要的工作,包括請求資源,分配資源,啟動Driver和Executor,Executor啟動失敗的錯誤處理。
3、ApplicationMaster的請求、分配資源是通過YarnAllocationHandler來進行的。
4、Container選擇的順序是:優先選擇機器 > 同一個rack的機器 > 任意機器。
5、ExecutorRunner只負責向Container傳送啟動CoarseGrainedExecutorBackend的命令。
6、Executor的錯誤處理是在ApplicationMaster的launchReporterThread方法裡面,它啟動的執行緒除了報告執行狀態,還會監控Executor的執行,一旦發現有丟失的Executor就重新請求。
7、在yarn目錄下看到的名稱裡面帶有YarnClient的是屬於yarn-client模式的類,實現和前面的也差不多。
其它的內容更多是Yarn的客戶端api使用,我也不太會,只是看到了能懂個意思,哈哈。