《深入理解SPARK:核心思想與原始碼分析》——SparkContext的初始化(仲篇)——SparkUI、環境變數及排程
《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》
《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》
由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。
本文展現第3章第二部分的內容:
3.4 SparkUI詳解
任何系統都需要提供監控功能,用瀏覽器能訪問具有樣式及佈局,並提供豐富監控資料的頁面無疑是一種簡單、高效的方式。SparkUI就是這樣的服務,它的構成如圖3-1所示。
在大型分散式系統中,採用事件監聽機制是最常見的。為什麼要使用事件監聽機制?假如SparkUI採用Scala的函式呼叫方式,那麼隨著整個叢集規模的增加,對函式的呼叫會越來越多,最終會受到Driver所在JVM的執行緒數量限制而影響監控資料的更新,甚至出現監控資料無法及時顯示給使用者的情況。由於函式呼叫多數情況下是同步呼叫,這就導致執行緒被阻塞,在分散式環境中,還可能因為網路問題,導致執行緒被長時間佔用。將函式呼叫更換為傳送事件,事件的處理是非同步的,當前執行緒可以繼續執行後續邏輯,執行緒池中的執行緒還可以被重用,這樣整個系統的併發度會大大增加。傳送的事件會存入快取,由定時排程器取出後,分配給監聽此事件的監聽器對監控資料進行更新。
圖3-1 SparkUI架構
我們先將圖3-1中的各個元件作簡單介紹:DAGScheduler是主要的產生各類SparkListenerEvent的源頭,它將各種SparkListenerEvent傳送到listenerBus的事件佇列中,listenerBus通過定時器將SparkListenerEvent事件匹配到具體的SparkListener,改變SparkListener中的統計監控資料,最終由SparkUI的介面展示。從圖3-1中還可以看到Spark裡定義了很多監聽器SparkListener的實現,包括JobProgressListener、EnviromentListener、StorageListener、ExecutorsListener幾種,它們的類繼承體系如圖3-2所示。
圖3-2 SparkListener繼承體系
3.4.1 listenerBus詳解
listenerBus的型別是LiveListenerBus,LiveListenerBus實現了監聽器模型,通過監聽事件觸發對各種監聽器監聽狀態資訊的修改,達到UI介面的資料重新整理效果。LiveListenerBus由以下部分組成:
q 事件阻塞佇列:型別為LinkedBlockingQueue[SparkListenerEvent],固定大小是10000;
q 監聽器陣列:型別為ArrayBuffer[SparkListener],存放各類監聽器SparkListener。SparkListener是;
q 事件匹配監聽器的執行緒:此Thread不斷拉取LinkedBlockingQueue中的事情,遍歷監聽器,呼叫監聽器的方法。任何事件都會在LinkedBlockingQueue中存在一段時間,然後Thread處理了此事件後,會將其清除。因此使用listener bus這個名字再合適不過了,到站就下車。listenerBus的實現,見程式碼清單3-15。
程式碼清單3-15 LiveListenerBus的事件處理實現
private val EVENT_QUEUE_CAPACITY = 10000 private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false private var started = false // A counter that represents the number of events produced and consumed in the queue private val eventLock = new Semaphore(0) private val listenerThread = new Thread("SparkListenerBus") { setDaemon(true) override def run(): Unit = Utils.logUncaughtExceptions { while (true) { eventLock.acquire() // Atomically remove and process this event LiveListenerBus.this.synchronized { val event = eventQueue.poll if (event == SparkListenerShutdown) { // Get out of the while loop and shutdown the daemon thread return } Option(event).foreach(postToAll) } } } } def start() { if (started) { throw new IllegalStateException("Listener bus already started!") } listenerThread.start() started = true } def post(event: SparkListenerEvent) { val eventAdded = eventQueue.offer(event) if (eventAdded) { eventLock.release() } else { logQueueFullErrorMessage() } } def listenerThreadIsAlive: Boolean = synchronized { listenerThread.isAlive } def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty } def stop() { if (!started) { throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") } post(SparkListenerShutdown) listenerThread.join() }
LiveListenerBus中呼叫的postToAll方法實際定義在父類SparkListenerBus中,如程式碼清單3-16所示。
程式碼清單3-16 SparkListenerBus中的監聽器呼叫
protected val sparkListeners = new ArrayBuffer[SparkListener] with mutable.SynchronizedBuffer[SparkListener] def addListener(listener: SparkListener) { sparkListeners += listener } def postToAll(event: SparkListenerEvent) { event match { case stageSubmitted: SparkListenerStageSubmitted => foreachListener(_.onStageSubmitted(stageSubmitted)) case stageCompleted: SparkListenerStageCompleted => foreachListener(_.onStageCompleted(stageCompleted)) case jobStart: SparkListenerJobStart => foreachListener(_.onJobStart(jobStart)) case jobEnd: SparkListenerJobEnd => foreachListener(_.onJobEnd(jobEnd)) case taskStart: SparkListenerTaskStart => foreachListener(_.onTaskStart(taskStart)) case taskGettingResult: SparkListenerTaskGettingResult => foreachListener(_.onTaskGettingResult(taskGettingResult)) case taskEnd: SparkListenerTaskEnd => foreachListener(_.onTaskEnd(taskEnd)) case environmentUpdate: SparkListenerEnvironmentUpdate => foreachListener(_.onEnvironmentUpdate(environmentUpdate)) case blockManagerAdded: SparkListenerBlockManagerAdded => foreachListener(_.onBlockManagerAdded(blockManagerAdded)) case blockManagerRemoved: SparkListenerBlockManagerRemoved => foreachListener(_.onBlockManagerRemoved(blockManagerRemoved)) case unpersistRDD: SparkListenerUnpersistRDD => foreachListener(_.onUnpersistRDD(unpersistRDD)) case applicationStart: SparkListenerApplicationStart => foreachListener(_.onApplicationStart(applicationStart)) case applicationEnd: SparkListenerApplicationEnd => foreachListener(_.onApplicationEnd(applicationEnd)) case metricsUpdate: SparkListenerExecutorMetricsUpdate => foreachListener(_.onExecutorMetricsUpdate(metricsUpdate)) case SparkListenerShutdown => } } private def foreachListener(f: SparkListener => Unit): Unit = { sparkListeners.foreach { listener => try { f(listener) } catch { case e: Exception => logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e) } } }
3.4.2 構造JobProgressListener
我們以JobProgressListener為例來講解SparkListener。JobProgressListener是SparkContext中一個重要的組成部分,通過監聽listenerBus中的事件更新任務進度。SparkStatusTracker和SparkUI實際上也是通過JobProgressListener來實現任務狀態跟蹤的。建立JobProgressListener的程式碼如下。
private[spark] val jobProgressListener = new JobProgressListener(conf) listenerBus.addListener(jobProgressListener) val statusTracker = new SparkStatusTracker(this)
JobProgressListener的作用是通過HashMap、ListBuffer等資料結構儲存JobId及對應的JobUIData資訊,並按照啟用、完成、失敗等job狀態統計。對於StageId、StageInfo等資訊按照啟用、完成、忽略、失敗等stage狀態統計。並且儲存StageId與JobId的一對多關係。這些統計資訊最終會被JobPage和StagePage等頁面訪問和渲染。JobProgressListener的資料結構見程式碼清單3-17。
程式碼清單3-17 JobProgressListener維護的資訊
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { import JobProgressListener._ type JobId = Int type StageId = Int type StageAttemptId = Int type PoolName = String type ExecutorId = String // Jobs: val activeJobs = new HashMap[JobId, JobUIData] val completedJobs = ListBuffer[JobUIData]() val failedJobs = ListBuffer[JobUIData]() val jobIdToData = new HashMap[JobId, JobUIData] // Stages: val activeStages = new HashMap[StageId, StageInfo] val completedStages = ListBuffer[StageInfo]() val skippedStages = ListBuffer[StageInfo]() val failedStages = ListBuffer[StageInfo]() val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData] val stageIdToInfo = new HashMap[StageId, StageInfo] val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]] val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]() var numCompletedStages = 0 // 總共完成的Stage數量 var numFailedStages = 0 / 總共失敗的Stage數量 // Misc: val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]() def blockManagerIds = executorIdToBlockManagerId.values.toSeq var schedulingMode: Option[SchedulingMode] = None // number of non-active jobs and stages (there is no limit for active jobs and stages): val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES) val retainedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
JobProgressListener 實現了onJobStart、onJobEnd、onStageCompleted、onStageSubmitted、onTaskStart、onTaskEnd等方法,這些方法正是在listenerBus的驅動下,改變JobProgressListener中的各種Job、Stage相關的資料。
3.4.3 SparkUI的建立與初始化
建立SparkUI的實現,見程式碼清單3-18。
程式碼清單3-18 SparkUI的宣告
private[spark] val ui: Option[SparkUI] = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener, env.securityManager,appName)) } else { None } ui.foreach(_.bind())
可以看到如果不需要提供SparkUI服務,可以將屬性spark.ui.enabled修改為false。其中createLiveUI實際是呼叫了create方法,見程式碼清單3-19。
程式碼清單3-19 SparkUI的建立
def createLiveUI( sc: SparkContext, conf: SparkConf, listenerBus: SparkListenerBus, jobProgressListener: JobProgressListener, securityManager: SecurityManager, appName: String): SparkUI = { create(Some(sc), conf, listenerBus, securityManager, appName, jobProgressListener = Some(jobProgressListener)) }
在create方法裡,除了JobProgressListener是外部傳入的之外,又增加了一些SparkListener。例如,用於對JVM引數、Spark屬性、Java系統屬性、classpath等進行監控的EnvironmentListener;用於維護executor的儲存狀態的StorageStatusListener;用於準備將executor的資訊展示在ExecutorsTab的ExecutorsListener;用於準備將executor相關儲存資訊展示在BlockManagerUI的StorageListener等。最後建立SparkUI,參見程式碼清單3-20。
程式碼清單3-20 create方法的實現
private def create( sc: Option[SparkContext], conf: SparkConf, listenerBus: SparkListenerBus, securityManager: SecurityManager, appName: String, basePath: String = "", jobProgressListener: Option[JobProgressListener] = None): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) listenerBus.addListener(listener) listener } val environmentListener = new EnvironmentListener val storageStatusListener = new StorageStatusListener val executorsListener = new ExecutorsListener(storageStatusListener) val storageListener = new StorageListener(storageStatusListener) listenerBus.addListener(environmentListener) listenerBus.addListener(storageStatusListener) listenerBus.addListener(executorsListener) listenerBus.addListener(storageListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, appName, basePath) }
SparkUI服務預設是可以被殺掉的,通過修改屬性spark.ui.killEnabled為false可以保證不被殺死。initialize方法,會組織前端頁面各個Tab和Page的展示及佈局,參見程式碼清單3-21。
程式碼清單3-21 SparkUI的初始化
private[spark] class SparkUI private ( val sc: Option[SparkContext], val conf: SparkConf, val securityManager: SecurityManager, val environmentListener: EnvironmentListener, val storageStatusListener: StorageStatusListener, val executorsListener: ExecutorsListener, val jobProgressListener: JobProgressListener, val storageListener: StorageListener, var appName: String, val basePath: String) extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI") with Logging { val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) /** Initialize all components of the server. */ def initialize() { attachTab(new JobsTab(this)) val stagesTab = new StagesTab(this) attachTab(stagesTab) attachTab(new StorageTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath)) attachHandler( createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest)) } initialize()
3.4.4 SparkUI的頁面佈局及展示
SparkUI究竟是如何實現頁面佈局及展示的?JobsTab展示所有Job的進度、狀態資訊,這裡我們以它為例來說明。JobsTab會複用SparkUI的killEnabled、SparkContext、jobProgressListener,包括AllJobsPage和JobPage兩個頁面,見程式碼清單3-22。
程式碼清單3-22 JobsTab的實現
private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") { val sc = parent.sc val killEnabled = parent.killEnabled def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) val listener = parent.jobProgressListener attachPage(new AllJobsPage(this)) attachPage(new JobPage(this)) }
AllJobsPage由render方法渲染,利用jobProgressListener中的統計監控資料生成啟用、完成、失敗等狀態的Job摘要資訊,並呼叫jobsTable方法生成表格等html元素,最終使用UIUtils的headerSparkPage封裝好css、js、header及頁面佈局等,見程式碼清單3-23。
程式碼清單3-23 AllJobsPage的實現
def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeJobs = listener.activeJobs.values.toSeq val completedJobs = listener.completedJobs.reverse.toSeq val failedJobs = listener.failedJobs.reverse.toSeq val now = System.currentTimeMillis val activeJobsTable = jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse) val completedJobsTable = jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse) val failedJobsTable = jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse) val summary: NodeSeq = <div> <ul class="unstyled"> {if (startTime.isDefined) { // Total duration is not meaningful unless the UI is live <li> <strong>Total Duration: </strong> {UIUtils.formatDuration(now - startTime.get)} </li> }} <li> <strong>Scheduling Mode: </strong> {listener.schedulingMode.map(_.toString).getOrElse("Unknown")} </li> <li> <a href="#active"><strong>Active Jobs:</strong></a> {activeJobs.size} </li> <li> <a href="#completed"><strong>Completed Jobs:</strong></a> {completedJobs.size} </li> <li> <a href="#failed"><strong>Failed Jobs:</strong></a> {failedJobs.size} </li> </ul> </div>
jobsTable用來生成表格資料,見程式碼清單3-24。
程式碼清單3-24 jobsTable處理表格的實現
private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = { val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined) val columns: Seq[Node] = { <th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th> <th>Description</th> <th>Submitted</th> <th>Duration</th> <th class="sorttable_nosort">Stages: Succeeded/Total</th> <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th> } <table class="table table-bordered table-striped table-condensed sortable"> <thead>{columns}</thead> <tbody> {jobs.map(makeRow)} </tbody> </table> }
表格中每行資料又是通過makeRow方法渲染的,參見程式碼清單3-25。
程式碼清單3-25 生成表格中的行
def makeRow(job: JobUIData): Seq[Node] = { val lastStageInfo = Option(job.stageIds) .filter(_.nonEmpty) .flatMap { ids => listener.stageIdToInfo.get(ids.max) } val lastStageData = lastStageInfo.flatMap { s => listener.stageIdToData.get((s.stageId, s.attemptId)) } val isComplete = job.status == JobExecutionStatus.SUCCEEDED val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)") val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("") val duration: Option[Long] = { job.startTime.map { start => val end = job.endTime.getOrElse(System.currentTimeMillis()) end - start } } val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown") val detailUrl = "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId) <tr> <td sorttable_customkey={job.jobId.toString}> {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")} </td> <td> <div><em>{lastStageDescription}</em></div> <a href={detailUrl}>{lastStageName}</a> </td> <td sorttable_customkey={job.startTime.getOrElse(-1).toString}> {formattedSubmissionTime} </td> <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td> <td class="stage-progress-cell"> {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages} {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"} {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"} </td> <td class="progress-cell"> {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks, failed = job.numFailedTasks, skipped = job.numSkippedTasks, total = job.numTasks - job.numSkippedTasks)} </td> </tr> }
程式碼清單3-22中的attachPage方法存在於JobsTab的父類WebUITab中,WebUITab維護有ArrayBuffer[WebUIPage]的資料結構,AllJobsPage和JobPage將被放入此ArrayBuffer中,參見程式碼清單3-26。
程式碼清單3-26 WebUITab的實現
private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) { val pages = ArrayBuffer[WebUIPage]() val name = prefix.capitalize /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */ def attachPage(page: WebUIPage) { page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") pages += page } /** Get a list of header tabs from the parent UI. */ def headerTabs: Seq[WebUITab] = parent.getTabs def basePath: String = parent.getBasePath }
JobsTab建立之後,將被attachTab方法加入SparkUI的ArrayBuffer[WebUITab]中,並且通過attachPage方法,給每一個page生成org.eclipse.jetty.servlet.ServletContextHandler,最後呼叫attachHandler方法將ServletContextHandler繫結到SparkUI,即加入到handlers :ArrayBuffer[ServletContextHandler]和樣例類ServerInfo樣例類的rootHandler(ContextHandlerCollection)中。SparkUI繼承自WebUI,attachTab方法在WebUI中實現,參見程式碼清單3-27。
程式碼清單3-27 WebUI的實現
private[spark] abstract class WebUI( securityManager: SecurityManager, port: Int, conf: SparkConf, basePath: String = "", name: String = "") extends Logging { protected val tabs = ArrayBuffer[WebUITab]() protected val handlers = ArrayBuffer[ServletContextHandler]() protected var serverInfo: Option[ServerInfo] = None protected val localHostName = Utils.localHostName() protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) private val className = Utils.getFormattedClassName(this) def getBasePath: String = basePath def getTabs: Seq[WebUITab] = tabs.toSeq def getHandlers: Seq[ServletContextHandler] = handlers.toSeq def getSecurityManager: SecurityManager = securityManager /** Attach a tab to this UI, along with all of its attached pages. */ def attachTab(tab: WebUITab) { tab.pages.foreach(attachPage) tabs += tab } /** Attach a page to this UI. */ def attachPage(page: WebUIPage) { val pagePath = "/" + page.prefix attachHandler(createServletHandler(pagePath, (request: HttpServletRequest) => page.render(request), securityManager, basePath)) attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json", (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)) } /** Attach a handler to this UI. */ def attachHandler(handler: ServletContextHandler) { handlers += handler serverInfo.foreach { info => info.rootHandler.addHandler(handler) if (!handler.isStarted) { handler.start() } } }
由於程式碼清單3-27所在的類中使用import org.apache.spark.ui.JettyUtils._匯入了JettyUtils的靜態方法,所以createServletHandler方法實際是JettyUtils 的靜態方法createServletHandler。createServletHandler實際建立了javax.servlet.http.HttpServlet的匿名內部類例項,此例項實際使用(request: HttpServletRequest) => page.render(request)這個函式引數來處理請求,進而渲染頁面呈現給使用者。有關createServletHandler的實現,及Jetty的相關資訊,請參閱附錄C。
3.4.5 SparkUI啟動
parkUI建立好後,需要呼叫父類WebUI的bind方法,繫結服務和埠,bind方法中主要的程式碼實現如下。
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf, name))
JettyUtils的靜態方法startJettyServer的實現請參閱附錄C。最終啟動了Jetty提供的服務,預設埠是4040。
3.5 Hadoop相關配置及Executor環境變數
3.5.1 Hadoop相關配置資訊
預設情況下,Spark使用HDFS作為分散式檔案系統,所以需要獲取Hadoop相關配置資訊的程式碼如下。
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
獲取的配置資訊包括:
q Amazon S3檔案系統AccessKeyId和SecretAccessKey載入到Hadoop的Configuration;
q 將SparkConf中所有spark.hadoop.開頭的屬性都複製到Hadoop的Configuration;
q 將SparkConf的屬性spark.buffer.size複製為Hadoop的Configuration的配置io.file.buffer.size。
注意:如果指定了SPARK_YARN_MODE屬性,則會使用YarnSparkHadoopUtil,否則預設為SparkHadoopUtil。
3.5.2 Executor環境變數
對Executor的環境變數的處理,參見程式碼清單3-28。executorEnvs 包含的環境變數將會在7.2.2節中介紹的註冊應用的過程中傳送給Master,Master給Worker傳送排程後,Worker最終使用executorEnvs提供的資訊啟動Executor。可以通過配置spark.executor.memory指定Executor佔用的記憶體大小,也可以配置系統變數SPARK_EXECUTOR_MEMORY或者SPARK_MEM對其大小進行設定。
程式碼清單3-28 Executor 環境變數的處理
private[spark] val executorMemory = conf.getOption("spark.executor.memory") .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem)) .map(Utils.memoryStringToMb) .getOrElse(512) // Environment variables to pass to our executors. private[spark] val executorEnvs = HashMap[String, String]() for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing")) value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => executorEnvs("SPARK_PREPEND_CLASSES") = v } // The Mesos scheduler backend relies on this environment variable to set executor memory. executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" executorEnvs ++= conf.getExecutorEnv // Set SPARK_USER for user who is running SparkContext. val sparkUser = Option { Option(System.getenv("SPARK_USER")).getOrElse(System.getProperty("user.name")) }.getOrElse { SparkContext.SPARK_UNKNOWN_USER } executorEnvs("SPARK_USER") = sparkUser
3.6 建立任務排程器TaskScheduler
TaskScheduler也是SparkContext的重要組成部分,負責任務的提交,並且請求叢集管理器對任務排程。TaskScheduler也可以看做任務排程的客戶端。建立TaskScheduler的程式碼如下。
private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)
createTaskScheduler方法會根據master的配置匹配部署模式,建立TaskSchedulerImpl,並生成不同的SchedulerBackend。本章為了使讀者更容易理解Spark的初始化流程,故以local模式為例,其餘模式將在第6章詳解。master匹配local模式的程式碼如下。
master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(scheduler, 1) scheduler.initialize(backend) (backend, scheduler)
3.6.1 建立TaskSchedulerImpl
TaskSchedulerImpl的構造過程如下:
1) 從SparkConf中讀取配置資訊,包括每個任務分配的CPU數、排程模式(排程模式有FAIR和FIFO兩種,預設為FIFO,可以修改屬性spark.scheduler.mode來改變)等。
2) 建立TaskResultGetter,它的作用是通過執行緒池(Executors.newFixedThreadPool建立的,預設4個執行緒,執行緒名字以task-result-getter開頭,執行緒工廠預設是Executors.defaultThreadFactory),對slave傳送的task的執行結果進行處理。
TaskSchedulerImpl的主要組成,見程式碼清單3-29。
程式碼清單3-29 TaskSchedulerImpl的實現
var dagScheduler: DAGScheduler = null var backend: SchedulerBackend = null val mapOutputTracker = SparkEnv.get.mapOutputTracker var schedulableBuilder: SchedulableBuilder = null var rootPool: Pool = null // default scheduler is FIFO private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO") val schedulingMode: SchedulingMode = try { SchedulingMode.withName(schedulingModeConf.toUpperCase) } catch { case e: java.util.NoSuchElementException => throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf") } // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
TaskSchedulerImpl的排程模式有FAIR和FIFO兩種。任務的最終排程實際都是落實到介面SchedulerBackend的具體實現上的。為方便分析,我們先來看看local模式中SchedulerBackend的實現LocalBackend。LocalBackend依賴於LocalActor與ActorSystem進行訊息通訊。LocalBackend參見程式碼清單3-30。
程式碼清單3-30 LocalBackend的實現
private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int) extends SchedulerBackend with ExecutorBackend { private val appId = "local-" + System.currentTimeMillis var localActor: ActorRef = null override def start() { localActor = SparkEnv.get.actorSystem.actorOf( Props(new LocalActor(scheduler, this, totalCores)), "LocalBackendActor") } override def stop() { localActor ! StopExecutor } override def reviveOffers() { localActor ! ReviveOffers } override def defaultParallelism() = scheduler.conf.getInt("spark.default.parallelism", totalCores) override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) { localActor ! KillTask(taskId, interruptThread) } override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) { localActor ! StatusUpdate(taskId, state, serializedData) } override def applicationId(): String = appId }
3.6.2 TaskSchedulerImpl的初始化
建立完TaskSchedulerImpl和LocalBackend後,對TaskSchedulerImpl呼叫方法initialize進行初始化。初始化過程如下:
1) 使TaskSchedulerImpl持有LocalBackend的引用。
2) 建立Pool,Pool中快取了排程佇列、排程演算法及TaskSetManager集合等資訊。
3) 建立FIFOSchedulableBuilder,FIFOSchedulableBuilder用來操作Pool中的排程佇列。
Initialize方法的實現見程式碼清單3-31。
程式碼清單3-31 TaskSchedulerImpl的初始化
def initialize(backend: SchedulerBackend) { this.backend = backend rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) } } schedulableBuilder.buildPools() }
3.7 建立和啟動DAGScheduler
DAGScheduler主要用於在任務正式交給TaskSchedulerImpl提交之前做一些準備工作,包括:建立Job,將DAG中的RDD劃分到不同的Stage、提交Stage,等等。建立DAGScheduler的程式碼如下。
@volatile private[spark] var dagScheduler: DAGScheduler = _ dagScheduler = new DAGScheduler(this)
DAGScheduler的資料結構主要維護jobId和stageId的關係、Stage、ActiveJob,以及快取的RDD的partitions的位置資訊,見程式碼清單3-32。
程式碼清單3-32 DAGScheduler維護的資料結構
private[scheduler] val nextJobId = new AtomicInteger(0) private[scheduler] def numTotalJobs: Int = nextJobId.get() private val nextStageId = new AtomicInteger(0) private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]] private[scheduler] val stageIdToStage = new HashMap[Int, Stage] private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage] private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob] // Stages we need to run whose parents aren't done private[scheduler] val waitingStages = new HashSet[Stage] // Stages we are running right now private[scheduler] val runningStages = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures private[scheduler] val failedStages = new HashSet[Stage] private[scheduler] val activeJobs = new HashSet[ActiveJob] // Contains the locations that each RDD's partitions are cached on private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]] private val failedEpoch = new HashMap[String, Long] private val dagSchedulerActorSupervisor = env.actorSystem.actorOf(Props(new DAGSchedulerActorSupervisor(this))) private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
在構造DAGScheduler的時候會呼叫initializeEventProcessActor方法建立DAGSchedulerEventProcessActor,見程式碼清單3-33。
程式碼清單3-33 DAGSchedulerEventProcessActor的初始化
private[scheduler] var eventProcessActor: ActorRef = _ private def initializeEventProcessActor() { // blocking the thread until supervisor is started, which ensures eventProcessActor is // not null before any job is submitted implicit val timeout = Timeout(30 seconds) val initEventActorReply = dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) eventProcessActor = Await.result(initEventActorReply, timeout.duration). asInstanceOf[ActorRef] } initializeEventProcessActor()
這裡的DAGSchedulerActorSupervisor主要作為DAGSchedulerEventProcessActor的監管者,負責生成DAGSchedulerEventProcessActor。從程式碼清單3-34可以看出,DAGSchedulerActorSupervisor對於DAGSchedulerEventProcessActor採用了Akka的一對一監管策略。DAGSchedulerActorSupervisor一旦生成DAGSchedulerEventProcessActor,並註冊到ActorSystem,ActorSystem就會呼叫DAGSchedulerEventProcessActor的preStart,taskScheduler於是就持有了dagScheduler,見程式碼清單3-35。從程式碼清單3-35我們還看到DAGSchedulerEventProcessActor所能處理的訊息型別,比如handleJobSubmitted、handleBeginEvent、handleTaskCompletion等。DAGSchedulerEventProcessActor接受這些訊息後會有不同的處理動作,在本章,讀者只需要理解到這裡即可,後面章節用到時會詳細分析。
程式碼清單3-34 DAGSchedulerActorSupervisor的監管策略
private[scheduler] class DAGSchedulerActorSupervisor(dagScheduler: DAGScheduler) extends Actor with Logging { override val supervisorStrategy = OneForOneStrategy() { case x: Exception => logError("eventProcesserActor failed; shutting down SparkContext", x) try { dagScheduler.doCancelAllJobs() } catch { case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) } dagScheduler.sc.stop() Stop } def receive = { case p: Props => sender ! context.actorOf(p) case _ => logWarning("received unknown message in DAGSchedulerActorSupervisor") } }
程式碼清單3-35 DAGSchedulerEventProcessActor的實現
private[scheduler] class DAGSchedulerEventProcessActor(dagScheduler: DAGScheduler) extends Actor with Logging { override def preStart() { dagScheduler.taskScheduler.setDAGScheduler(dagScheduler) } /** * The main event loop of the DAG scheduler. */ def receive = { case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) case AllJobsCancelled => dagScheduler.doCancelAllJobs() case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId, fetchFailed = false) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) case TaskSetFailed(taskSet, reason) => dagScheduler.handleTaskSetFailed(taskSet, reason) case ResubmitFailedStages => dagScheduler.resubmitFailedStages() } override def postStop() { // Cancel any active jobs in postStop hook dagScheduler.cleanUpAfterSchedulerStop() }
未完待續。。。
後記:自己犧牲了7個月的週末和下班空閒時間,通過研究Spark原始碼和原理,總結整理的《深入理解Spark:核心思想與原始碼分析》一書現在已經正式出版上市,目前亞馬遜、京東、噹噹、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究原始碼時的Spark版本是1.2.0,經過7個多月的研究和出版社近4個月的流程,Spark自身的版本迭代也很快,如今最新已經是1.6.0。目前市面上另外2本原始碼研究的Spark書籍的版本分別是0.9.0版本和1.2.0版本,看來這些書的作者都與我一樣,遇到了這種問題。由於研究和出版都需要時間,所以不能及時跟上Spark的腳步,還請大家見諒。但是Spark核心部分的變化相對還是很少的,如果對版本不是過於追求,依然可以選擇本書。
相關推薦
《深入理解SPARK:核心思想與原始碼分析》——SparkContext的初始化(仲篇)——SparkUI、環境變數及排程
《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》 由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。 本文展現第3章第二部分的內容:
《深入理解Spark:核心思想與原始碼分析》(第2章)
《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 本文主要展示本書的第2章內容: Spark設計理念與基本架構 “若夫乘天地之正,而御六氣之辯,以遊無窮者,彼且惡乎待哉?” ——《莊子·逍遙遊》 n本章導讀: 上一章,介紹了Spark環境的搭建,為方便讀
《深入理解SPARK:核心思想與原始碼分析》一書正式出版上市
自己犧牲了7個月的週末和下班空閒時間,通過研究Spark原始碼和原理,總結整理的《深入理解Spark:核心思想與原始碼分析》一書現在已經正式出版上市,目前京東、噹噹、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究原始碼時的Spark版本是1.2.0,經過7個多月的研
《深入理解Spark:核心思想與原始碼分析》(前言及第1章)
自己犧牲了7個月的週末和下班空閒時間,通過研究Spark原始碼和原理,總結整理的《深入理解Spark:核心思想與原始碼分析》一書現在已經正式出版上市,目前亞馬遜、京東、噹噹、天貓等網站均有銷售,歡迎感興趣的同學購買。我開始研究原始碼時的Spark版本是1.2.0,經過7個多月的研究和出版社近4個月的流
《深入理解Spark:核心思想與原始碼分析》——SparkContext的初始化(伯篇)——執行環境與元資料清理器
《深入理解Spark:核心思想與原始碼分析》一書第一章的內容請看連結《第1章 環境準備》 《深入理解Spark:核心思想與原始碼分析》一書第二章的內容請看連結《第2章 SPARK設計理念與基本架構》 由於本書的第3章內容較多,所以打算分別開闢四篇隨筆分別展現。本文展現第3章第一部分的內容: 第3章
《深入理解Spark-核心思想與原始碼分析》讀書筆記(1)
前兩章 第一章主要是講如何安裝和配置spark,以及如何匯入spark原始碼除錯執行;第二章主要講的是上次那本書《Spark快速大資料分析》的內容,科普一下spark的知識。 第三章 SparkContext的初始化 1. 概述 這章的主要內容就
《大型網站技術架構:核心原理與案例分析》讀書筆記 - 第2篇 架構
第2篇 架構 4 瞬時響應:網站的高效能架構 34 4.1 網站效能測試 35 效能測試是效能優化的前提和基礎,也是效能優化結果的檢查和度量標準。 4.1.1 不同視角下的網站效能 35 使用者:直觀感受到的快慢 開發:應用程式本身 運維:基礎設施效能和資源利用率 4.1.2 效
精盡MyBatis原始碼分析 - MyBatis初始化(二)之載入 Mapper 介面與 XML 對映檔案
> 該系列文件是本人在學習 Mybatis 的原始碼過程中總結下來的,可能對讀者不太友好,請結合我的原始碼註釋([Mybatis原始碼分析 GitHub 地址](https://github.com/liu844869663/mybatis-3)、[Mybatis-Spring 原始碼分析 GitHub 地址
SpringMVC原始碼分析--容器初始化(四)FrameworkServlet
一下SpringMVC配置檔案的地址contextConfigLocation的配置屬性,然後其呼叫的子類FrameworkServlet的initServletBean方法。 其實FrameworkServlet是springMVC初始化IOC容器的核心,通過讀取配置的c
springMVC原始碼分析--容器初始化(一)ContextLoaderListener
在spring Web中,需要初始化IOC容器,用於存放我們注入的各種物件。當tomcat啟動時首先會初始化一個web對應的IOC容器,用於初始化和注入各種我們在web執行過程中需要的物件。當tomcat啟動的時候是如何初始化IOC容器的,我們先看一下在web.xml中經常看
SpringMVC原始碼分析--容器初始化(五)DispatcherServlet
上一篇部落格SpringMVC原始碼分析--容器初始化(四)FrameworkServlet我們已經瞭解到了SpringMVC容器的初始化,SpringMVC對容器初始化後會進行一系列的其他屬性的初始化操作,在SpringMVC初始化完成之後會呼叫onRefresh(wac
SpringMVC原始碼分析--容器初始化(三)HttpServletBean
在上一篇部落格 springMVC原始碼分析--容器初始化(二)DispatcherServlet中,我們隊SpringMVC整體生命週期有一個簡單的說明,並沒有進行詳細的原始碼分析,接下來我們會根據部落格中提供的springMVC的生命週期圖來詳細的對SpringMVC的
精盡 MyBatis 原始碼分析 - MyBatis 初始化(一)之載入 mybatis-config.xml
> 該系列文件是本人在學習 Mybatis 的原始碼過程中總結下來的,可能對讀者不太友好,請結合我的原始碼註釋([Mybatis原始碼分析 GitHub 地址](https://github.com/liu844869663/mybatis-3)、[Mybatis-Spring 原始碼分析 GitHub 地址
精盡MyBatis原始碼分析 - MyBatis初始化(四)之 SQL 初始化(下)
> 該系列文件是本人在學習 Mybatis 的原始碼過程中總結下來的,可能對讀者不太友好,請結合我的原始碼註釋([Mybatis原始碼分析 GitHub 地址](https://github.com/liu844869663/mybatis-3)、[Mybatis-Spring 原始碼分析 GitHub 地址
深入理解Nginx:模組開發與架構解析
讀書筆記 第一部分 Nginx能幫我們做什麼 第1章 研究Nginx前的準備工作 1.1Nginx是什麼 1.2
《圖解Spark:核心技術與案例實戰》介紹及書附資源
本書中所使用到的測試資料、程式碼和安裝包放在百度盤提供 下載 ,連結: https://pan.baidu.com/s/1sXuOC3J-aHEc0E_kVWLqFg#list/path=%2F 另外在百度盤提供本書附錄 下載 ,連結: https://pan.baidu.com/s/1sO8NXqry
深入理解Nginx:模組開發與架構解析 讀書筆記
Nginx的作用 當我們在設計高效能web伺服器的時候,我們第一選擇是使用Nginx,因為nginx對伺服器效能上的挖掘已經到了非常高的水平,Nginx採用了無阻塞分階段的事件驅動框架。當nginx不能那個完全實現我們的業務需求的時候,我們可以在Nginx後端搭建一個非ng
《大型網站技術架構:核心原理與案例分析》-- 讀書筆記 (5) :網購秒殺系統
案例 並發 刷新 隨機 url 對策 -- 技術 動態生成 1. 秒殺活動的技術挑戰及應對策略 1.1 對現有網站業務造成沖擊 秒殺活動具有時間短,並發訪問量大的特點,必然會對現有業務造成沖擊。對策:秒殺系統獨立部署 1.2 高並發下的應用、
《大型網站技術架構:核心原理與案例分析》【PDF】下載
優化 均衡 1.7 3.3 架設 框架 應用服務器 博客 分布式服務框架 《大型網站技術架構:核心原理與案例分析》【PDF】下載鏈接: https://u253469.pipipan.com/fs/253469-230062557 內容簡介 本書通過梳理大型網站技
閱讀《大型網站技術架構:核心原理與案例分析》第五、六、七章,結合《河北省重大技術需求征集系統》,列舉實例分析采用的可用性和可修改性戰術
定時 並不會 表現 做出 span class 硬件 進行 情況 網站的可用性描述網站可有效訪問的特性,網站的頁面能完整呈現在用戶面前,需要經過很多個環節,任何一個環節出了問題,都可能導致網站頁面不可訪問。可用性指標是網站架構設計的重要指標,對外是服務承諾,對內是考核指