1. 程式人生 > >Spark-2.4.0原始碼:sparkContext

Spark-2.4.0原始碼:sparkContext

  在看sparkContext之前,先回顧一下Scala的語法。Scala建構函式分主構造和輔建構函式,輔建構函式是關鍵字def+this定義的,而類中不在方法體也不在輔建構函式中的程式碼就是主建構函式,例項化物件的時候主建構函式都會被執行,例:

  

class person(name String,age Int){
    println("主建構函式被呼叫")

    def this(name String,age Int){ //輔建構函式
        this ()    //必須先呼叫主建構函式
        this.name = name
        this.age = age
    }
    
    def introduce(){
        println("name :" + name + "-age :" + age)
    }
}

val jack = new person("jack",2)

jack.introduce()

  執行結果:

  主建構函式被呼叫

  name  :jack-age :2

 

  切入正題,看sparkContext的主建構函式比較重要的一些程式碼:

try{
        ...
        // Create the Spark execution environment (cache, map output tracker, etc)
        _env = createSparkEnv(_conf, isLocal, listenerBus)
        SparkEnv.set(_env)
        
        ...
        
        // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
        // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
        _heartbeatReceiver = env.rpcEnv.setupEndpoint(
        HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

        
        // Create and start the scheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
        // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // constructor
        _taskScheduler.start()
    }
    

  首先:

   _env = createSparkEnv(_conf, isLocal, listenerBus)
    SparkEnv.set(_env)
   _heartbeatReceiver = env.rpcEnv.setupEndpoint(
    HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

   這裡是在sparkContext中建立rpcEnv,並通過 setupEndpoint 向 rpcEnv 註冊一個心跳的 Endpoint。

  接著:

 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

  調的sparkContext自己的方法,建立taskScheduler,返回的是一個 (SchedulerBackend, TaskScheduler) 元組

private def createTaskScheduler(
      sc: SparkContext,
      master: String,
      deployMode: String): (SchedulerBackend, TaskScheduler) = {
    import SparkMasterRegex._

    // When running locally, don't try to re-execute tasks on failure.
    val MAX_LOCAL_TASK_FAILURES = 1

    master match {
        //...
        
        //standalone的提交模式
      case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        //呼叫初始化方法
        scheduler.initialize(backend)
        (backend, scheduler)
    }
    
        //...
    }

  方法內部根據master引數判斷不同的提交模式,建立不同的(SchedulerBackend, TaskScheduler) ,拿standalon模式舉例,根據入參建立TaskSchedulerImpl和StandalonSchedulerBackend,再呼叫TaskSchedulerImpl的初始化方法,最後返回一個元組。

   scheduler.initialize(backend),其實就是根據不同的schedulingMode建立不同的schedulableBuilder,它就是對Scheduleable tree的封裝,負責對taskSet的排程。

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

  接著下面兩行程式碼:

  _dagScheduler = new DAGScheduler(this)

  建立DAG有向無環圖,實現類面向stage的排程機制的高層次排程層,他會為每個stage計算DAG(有向無環圖),追蹤RDD和stage的輸出是否被物化(寫入磁碟或記憶體),並且尋找一個最少消耗的排程機制來執行job。它會將stage作為taskSets提交到底層的TaskSchedulerImpl上來在叢集執行。除了處理stage的DAG,它還負責決定執行每個task的最佳位置,基於當前的快取狀態,將最佳位置提交給底層的TaskSchedulerImpl,此外,他會處理由於每個shuffle輸出檔案導致的失敗,在這種情況下舊的stage可能會被重新提交。一個stage內部的失敗,如果不是由於shuffle檔案丟失導致的失敗,會被taskScheduler處理,它會多次重試每個task,還不行才會取消整個stage。

  _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

   在上面建立好了TaskScheduler和SchedulerBackend後,告訴 HeartbeatReceiver(心跳) 的監聽端。

   最後:

  _taskScheduler.start()

   在TaskSchedulerImpl的start()方法中調的是SchedulerBackend的start()方法,所以start()方法執行的是這段:

override def start() {
    super.start()

    // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
    // mode. In cluster mode, the code that submits the application to the Master needs to connect
    // to the launcher instead.
    if (sc.deployMode == "client") {
      launcherBackend.connect()
    }

    //引數設定
    
    val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
  }

  這裡建立了兩個物件:AppliactionDescription和AppClient,AppliactionDescription顧名思義就是對Application的描述類,比如它需要的資源等;AppClient負責負責為application與spark叢集通訊。SchedulerBackend的start()最終呼叫了AppClient的start(),程式碼如下:

def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }

   啟動一個rpcEndPoint並回調給監聽器,RPC原理可看這篇 https://www.cnblogs.com/superhedantou/p/7570692.html

  

   最後畫個大概流程圖

&n