Spark-2.4.0原始碼:sparkContext
在看sparkContext之前,先回顧一下Scala的語法。Scala建構函式分主構造和輔建構函式,輔建構函式是關鍵字def+this定義的,而類中不在方法體也不在輔建構函式中的程式碼就是主建構函式,例項化物件的時候主建構函式都會被執行,例:
class person(name String,age Int){ println("主建構函式被呼叫") def this(name String,age Int){ //輔建構函式 this () //必須先呼叫主建構函式 this.name = name this.age = age } def introduce(){ println("name :" + name + "-age :" + age) } } val jack = new person("jack",2)
jack.introduce()
執行結果:
主建構函式被呼叫
name :jack-age :2
切入正題,看sparkContext的主建構函式比較重要的一些程式碼:
try{ ... // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) ... // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) // Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor _taskScheduler.start() }
首先:
_env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env)
_heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
這裡是在sparkContext中建立rpcEnv,並通過 setupEndpoint 向 rpcEnv 註冊一個心跳的 Endpoint。
接著:
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
調的sparkContext自己的方法,建立taskScheduler,返回的是一個 (SchedulerBackend, TaskScheduler) 元組
private def createTaskScheduler( sc: SparkContext, master: String, deployMode: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 master match { //... //standalone的提交模式 case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) //呼叫初始化方法 scheduler.initialize(backend) (backend, scheduler) } //... }
方法內部根據master引數判斷不同的提交模式,建立不同的(SchedulerBackend, TaskScheduler) ,拿standalon模式舉例,根據入參建立TaskSchedulerImpl和StandalonSchedulerBackend,再呼叫TaskSchedulerImpl的初始化方法,最後返回一個元組。
scheduler.initialize(backend),其實就是根據不同的schedulingMode建立不同的schedulableBuilder,它就是對Scheduleable tree的封裝,負責對taskSet的排程。
def initialize(backend: SchedulerBackend) { this.backend = backend schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) case _ => throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + s"$schedulingMode") } } schedulableBuilder.buildPools() }
接著下面兩行程式碼:
_dagScheduler = new DAGScheduler(this)
建立DAG有向無環圖,實現類面向stage的排程機制的高層次排程層,他會為每個stage計算DAG(有向無環圖),追蹤RDD和stage的輸出是否被物化(寫入磁碟或記憶體),並且尋找一個最少消耗的排程機制來執行job。它會將stage作為taskSets提交到底層的TaskSchedulerImpl上來在叢集執行。除了處理stage的DAG,它還負責決定執行每個task的最佳位置,基於當前的快取狀態,將最佳位置提交給底層的TaskSchedulerImpl,此外,他會處理由於每個shuffle輸出檔案導致的失敗,在這種情況下舊的stage可能會被重新提交。一個stage內部的失敗,如果不是由於shuffle檔案丟失導致的失敗,會被taskScheduler處理,它會多次重試每個task,還不行才會取消整個stage。
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
在上面建立好了TaskScheduler和SchedulerBackend後,告訴 HeartbeatReceiver(心跳) 的監聽端。
最後:
_taskScheduler.start()
在TaskSchedulerImpl的start()方法中調的是SchedulerBackend的start()方法,所以start()方法執行的是這段:
override def start() { super.start() // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client // mode. In cluster mode, the code that submits the application to the Master needs to connect // to the launcher instead. if (sc.deployMode == "client") { launcherBackend.connect() } //引數設定 val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit) client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf) client.start() launcherBackend.setState(SparkAppHandle.State.SUBMITTED) waitForRegistration() launcherBackend.setState(SparkAppHandle.State.RUNNING) }
這裡建立了兩個物件:AppliactionDescription和AppClient,AppliactionDescription顧名思義就是對Application的描述類,比如它需要的資源等;AppClient負責負責為application與spark叢集通訊。SchedulerBackend的start()最終呼叫了AppClient的start(),程式碼如下:
def start() { // Just launch an rpcEndpoint; it will call back into the listener. endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))) }
啟動一個rpcEndPoint並回調給監聽器,RPC原理可看這篇 https://www.cnblogs.com/superhedantou/p/7570692.html
最後畫個大概流程圖
&n