Spark 核心篇-SparkContext
Spark 核心篇-SparkContext
閱讀目錄
- 1、功能描述
- 2、相關元件
- 3、程式碼分析
- 3.1 初始設定
- 3.2 建立執行環境 SparkEnv
- 3.3 建立 SparkUI
- 3.4 Hadoop 相關配置
- 3.5 Executor 環境變數
- 3.6 註冊 HeartbeatReceiver 心跳接收器
- 3.7 建立任務排程器 TaskScheduler
- 3.8 建立和啟動 DAGScheduler
- 3.9 TaskScheduler 的啟動
- 3.10 啟動測量系統 MetricsSystem
- 3.11 建立事件日誌監聽器
- 3.13 ContextCleaner 的建立與啟動
- 3.14 額外的 SparkListener 與啟動事件
- 3.15 Spark 環境更新
- 3.16 投遞應用程式啟動事件
- 3.17 建立 DAGSchedulerSource、BlockManagerSource和 ExecutorAllocationManagerSource
- 3.18 將 SparkContext 標記為啟用
本章內容:
1、功能描述
本篇文章就要根據原始碼分析SparkContext所做的一些事情,用過Spark的開發者都知道SparkContext是編寫Spark程式用到的第一個類,足以說明SparkContext的重要性;這裡先摘抄SparkContext原始碼註釋來簡單介紹介紹SparkContext,註釋的第一句話就是說SparkContext為Spark的主要入口點,簡明扼要,如把Spark叢集當作服務端那Spark Driver就是客戶端,SparkContext則是客戶端的核心;如註釋所說 SparkContext用於連線Spark叢集、建立RDD、累加器(accumlator)、廣播變數(broadcast variables),所以說SparkContext為Spark程式的根本都不為過。
SparkContext 是 Spark 中元老級的 API,從0.x.x 版本就已經存在。有過 Spark 使用經驗會感覺 SparkContext 已經太老了,然後 SparkContext 始終跟隨著 Spark 的迭代不斷向前。SparkContext 內部雖然已經發生了很大的變化,有些內部元件已經廢棄,有些元件已經優化,還有一些新的元件不斷加入,不斷煥發的強大的魅力,是 Spark 的靈魂。
1 2 3 4 5 6 7 8 9 10 11 |
|
也就是說SparkContext是Spark的入口,相當於應用程式的main函式。目前在一個JVM程序中可以建立多個SparkContext,但是隻能有一個active級別的。如果你需要建立一個新的SparkContext例項,必須先呼叫stop方法停掉當前active級別的SparkContext例項。
圖1 Spark 架構圖
圖片來自Spark官網,可以看到SparkContext處於DriverProgram核心位置,所有與Cluster、Worker Node互動的操作都需要SparkContext來完成。
圖2 SparkContext 在 Spark 應用程式中的扮演的主要角色
圖3 Driver 上執行的服務元件
2、相關元件
名稱 |
說明 |
---|---|
SparkConf |
Spark配置類,配置已鍵值對形式儲存,封裝了一個ConcurrentHashMap類例項settings用於儲存Spark的配置資訊。 |
SparkEnv | SparkContext中非常重要的類,它維護著Spark的執行環境,所有的執行緒都可以通過SparkContext訪問到同一個SparkEnv物件。 |
LiveListenerBus | SparkContext 中的事件匯流排,可以接收各種使用方的事件,並且非同步傳遞Spark事件監聽與SparkListeners監聽器的註冊。 |
SparkUI | 為Spark監控Web平臺提供了Spark環境、任務的整個生命週期的監控。 |
TaskScheduler | 為Spark的任務排程器,Spark通過他提交任務並且請求叢集排程任務。因其排程的 Task 由 DAGScheduler 建立,所以 DAGScheduler 是 TaskScheduler 的前置排程。 |
DAGScheduler | 為高階的、基於Stage的排程器, 負責建立 Job,將 DAG 中的 RDD 劃分到不同的 Stage,並將Stage作為Tasksets提交給底層排程器TaskScheduler執行。 |
HeartbeatReceiver | 心跳接收器,所有 Executor 都會向HeartbeatReceiver 傳送心跳,當其接收到 Executor 的心跳資訊後,首先更新 Executor 的最後可見時間,然後將此資訊交給 TaskScheduler 進一步處理。 |
ExecutorAllocationManager |
Executor 動態分配管理器,根據負載動態的分配與刪除Executor,可通過其設定動態分配最小Executor、最大Executor、初始Executor數量等配置。 |
ContextClearner | 上下文清理器,為RDD、shuffle、broadcast狀態的非同步清理器,清理超出應用範圍的RDD、ShuffleDependency、Broadcast物件。 |
SparkStatusTracker | 低級別的狀態報告API,只能提供非常脆弱的一致性機制,對Job(作業)、Stage(階段)的狀態進行監控。 |
HadoopConfiguration | Spark預設使用HDFS來作為分散式檔案系統,用於獲取Hadoop配置資訊。 |
以上的物件為SparkContext使用到的主要物件,可以看到SparkContext包含了Spark程式用到的幾乎所有核心物件可見SparkContext的重要性;建立SparkContext時會新增一個鉤子到ShutdownHookManager中用於在Spark程式關閉時對上述物件進行清理,在建立RDD等操作也會判斷SparkContext是否已stop;通常情況下一個Driver只會有一個SparkContext例項,但可通過spark.driver.allowMultipleContexts配置來允許driver中存在多個SparkContext例項。
3、程式碼分析
程式碼 |
說明 |
---|---|
SparkContext.markPartiallyConstructed(this, allowMultipleContexts) |
用來確保例項的唯一性 |
try { _conf = config.clone() _conf.validateSettings() if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") } if (!_conf.contains("spark.app.name")) { throw new SparkException("An application name must be set in your configuration") } // log out spark.app.name in the Spark driver logs logInfo(s"Submitted application: $appName") // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " + "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") } if (_conf.getBoolean("spark.logConf", false)) { logInfo("Spark configuration:\n" + _conf.toDebugString) } // Set Spark driver host and port system properties. This explicitly sets the configuration // instead of relying on the default value of the config constant. _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS)) _conf.setIfMissing("spark.driver.port", "0") _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten _eventLogDir = if (isEventLogEnabled) { val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) .stripSuffix("/") Some(Utils.resolveURI(unresolvedDir)) } else { None } _eventLogCodec = { val compress = _conf.getBoolean("spark.eventLog.compress", false) if (compress && isEventLogEnabled) { Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName) } else { None } } _listenerBus = new LiveListenerBus(_conf) // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. _statusStore = AppStatusStore.createLiveStore(conf) listenerBus.addToStatusQueue(_statusStore.listener.get) // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) // If running the REPL, register the repl's output dir with the file server. _conf.getOption("spark.repl.class.outputDir").foreach { path => val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path)) _conf.set("spark.repl.class.uri", replUri) } _statusTracker = new SparkStatusTracker(this, _statusStore) _progressBar = if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime)) } else { // For tests, do not enable the UI None } // Bind the UI before starting the task scheduler to communicate // the bound port to the cluster manager properly _ui.foreach(_.bind()) _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) // Add each JAR given through the constructor if (jars != null) { jars.foreach(addJar) } if (files != null) { files.foreach(addFile) } _executorMemory = _conf.getOption("spark.executor.memory") .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")) .map(warnSparkMem)) .map(Utils.memoryStringToMb) .getOrElse(1024) // Convert java options to env vars as a work around // since we can't set env vars directly in sbt. for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing")) value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => executorEnvs("SPARK_PREPEND_CLASSES") = v } // The Mesos scheduler backend relies on this environment variable to set executor memory. // TODO: Set this only in the Mesos scheduler. executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) // Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor _taskScheduler.start() _applicationId = _taskScheduler.applicationId() _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) if (_conf.getBoolean("spark.ui.reverseProxy", false)) { System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId) } _ui.foreach(_.setAppId(_applicationId)) _env.blockManager.initialize(_applicationId) // The metrics system for Driver need to be set spark.app.id to app ID. // So it should start after we get app ID from the task scheduler and set spark.app.id. _env.metricsSystem.start() // Attach the driver metrics servlet handler to the web ui after the metrics system is started. _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) _eventLogger = if (isEventLogEnabled) { val logger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() listenerBus.addToEventLogQueue(logger) Some(logger) } else { None } // Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, _env.blockManager.master)) case _ => None } } else { None } _executorAllocationManager.foreach(_.start()) _cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } _cleaner.foreach(_.start()) setupAndStartListenerBus() postEnvironmentUpdate() postApplicationStart() // Post init _taskScheduler.postStartHook() _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM // is killed, though. logDebug("Adding shutdown hook") // force eager creation of logger _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") stop() } } catch { case NonFatal(e) => logError("Error initializing SparkContext.", e) try { stop() } catch { case NonFatal(inner) => logError("Error stopping SparkContext after init error.", inner) } finally { throw e } } |
|
SparkContext.setActiveContext(this, allowMultipleContexts) |
將SparkContext標記為啟用 |
3.1 初始設定
首先儲存了當前的CallSite資訊,並且判斷是否允許建立多個SparkContext例項,使用的是spark.driver.allowMultipleContexts屬性,預設為false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
接下來是對SparkConf進行復制,然後對各種配置資訊進行校驗,其中最主要的就是SparkConf必須指定 spark.master(用於設定部署模式)和 spark.app.name(應用程式名稱)屬性,否則會丟擲異常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
|
3.2 建立執行環境 SparkEnv
SparkEnv是Spark的執行環境物件,其中包括與眾多Executor指向相關的物件。在local模式下Driver會建立Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend程序中也會建立Executor,所以SparkEnv存在於Driver或者CoarseGrainedExecutorBackend程序中。
建立SparkEnv主要使用SparkEnv的createDriverEnv方法,有四個引數:conf、isLocal、listenerBus 以及在本地模式下driver執行executor需要的numberCores。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
|
3.3 建立 SparkUI
SparkUI 提供了用瀏覽器訪問具有樣式及佈局並且提供豐富監控資料的頁面。其採用的是時間監聽機制。傳送的事件會存入快取,由定時排程器取出後分配給監聽此事件的監聽器對監控資料進行更新。如果不需要SparkUI,則可以將spark.ui.enabled置為false。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
3.4 Hadoop 相關配置
預設情況下,Spark使用HDFS作為分散式檔案系統,所以需要獲取Hadoop相關的配置資訊:
1 2 3 4 5 6 7 8 9 10 |
|
獲取的配置資訊包括:
- 將Amazon S3檔案系統的AWS_ACCESS_KEY_ID和 AWS_SECRET_ACCESS_KEY載入到Hadoop的Configuration;
- 將SparkConf中所有的以spark.hadoop.開頭的屬性都賦值到Hadoop的Configuration;
- 將SparkConf的屬性spark.buffer.size複製到Hadoop的Configuration的配置io.file.buffer.size。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
|
3.5 Executor 環境變數
executorEnvs包含的環境變數將會註冊應用程式的過程中傳送給Master,Master給Worker傳送排程後,Worker最終使用executorEnvs提供的資訊啟動Executor。
通過配置spark.executor.memory指定Executor佔用的記憶體的大小,也可以配置系統變數SPARK_EXECUTOR_MEMORY或者SPARK_MEM設定其大小。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
executorEnvs是由一個HashMap儲存:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
3.6 註冊 HeartbeatReceiver 心跳接收器
在 Spark 的實際生產環境中,Executor 是執行在不同的節點上的。在 local 模式下的 Driver 與 Executor 屬於同一個程序,所以 Dirver 與 Executor 可以直接使用本地呼叫互動,當 Executor 執行出現問題時,Driver 可以很方便地知道,例如,通過捕獲異常。但是在生產環境下,Driver 與 Executor 很可能不在同一個程序內,他們也許執行在不同的機器上,甚至在不同的機房裡,因此 Driver 對 Executor 失去掌握。為了能夠掌控 Executor,在 Driver 中建立了這個心跳接收器。
建立 HeartbearReceiver 的程式碼:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
上面的程式碼中使用了 SparkEnv 的子元件 NettyRpcEnv 的 setupEndpoint 方法,此方法的作用是想 RpcEnv 的 Dispatcher 註冊 HeartbeatReceiver,並返回 HeartbeatReceiver 的 NettyRpcEndpointRef 引用。
3.7 建立任務排程器 TaskScheduler
TaskScheduler也是SparkContext的重要組成部分,負責任務的提交,請求叢集管理器對任務排程,並且負責傳送的任務到叢集,執行它們,任務失敗的重試,以及慢任務的在其他節點上重試。 其中給應用程式分配並執行 Executor為一級排程,而給任務分配 Executor 並執行任務則為二級排程。另外 TaskScheduler 也可以看做任務排程的客戶端。
- 為 TaskSet建立和維護一個TaskSetManager並追蹤任務的本地性以及錯誤資訊;
- 遇到Straggle 任務會方到其他的節點進行重試;
- 向DAGScheduler彙報執行情況, 包括在Shuffle輸出lost的時候報告fetch failed 錯誤等資訊;
TaskScheduler負責任務排程資源分配,SchedulerBackend負責與Master、Worker通訊收集Worker上分配給該應用使用的資源情況。
圖4 SparkContext 建立 Task Scheduler 和 Scheduler Backend
建立 TaskScheduler 的程式碼:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
|
createTaskScheduler方法根據master的配置匹配部署模式,建立TaskSchedulerImpl,並生成不同的SchedulerBackend。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
|