1. 程式人生 > >SparkContext建立初始化完成的主要工作

SparkContext建立初始化完成的主要工作


SparkContext是提交作業到叢集的切入點,所以接下來講解一下SparkContext的初始化過程。


當我們使用spark-submit指令碼提交一個作業之後,流程如下:


提交作業之後(省略SparkSubmit的分析不走) -> 反射建立mainClass  -> 初始化SparkContext -> 使用sparkContext建立RDD -> 執行count運算元runJob(runJob是SparkContext中的)方法觸發作業->呼叫sparkContext中dagScheduler的runJob方法->呼叫dagScheduler的submitJob方法完成提交,返回一個waiter物件!

通過上面流程可以知道,我們自己的mainClass是通過反射呼叫的main方法,然後執行mainClass中的我們自己實現的邏輯!SparkContext是一個近2000多行的程式碼類,負責RDD的建立,廣播變數,加速器的呼叫。可見其重要性接!其概括性主要完成如下任務:

主構造器主要完成的事情:1) 建立SparkEnv 2)建立TaskScheduler 3)建立DAGScheduler 4)啟動TaskScheduler

下來正式開始分析SparkContext的初始化過程:

1:SparkContext重要的成員有:

/**
    * SparkEnv,Holds all the runtime environment objects for a running Spark instance 
    */
  private var _env: SparkEnv = _

 /**
    *_schedulerBackend is a cluster manager ,holded by taskscheduler .
    */
  private var _schedulerBackend: SchedulerBackend = _

  /**
    * taskScheduler,底層的task排程介面
    */
  private var _taskScheduler: TaskScheduler = _

  /**
    * dagScheduler主要負責切分stage的,決定每個Task的最佳位置等功能
    */
  @volatile private var _dagScheduler: DAGScheduler = _


2: 在SparkContext主構造器中重要成員的初始化

    // Create the Spark execution environment (cache, map output tracker, etc)
    //TODO SparkEnv其實就是Driver的RpcEnv環境,_conf是
    _env = createSparkEnv(_conf, isLocal, listenerBus) 
傳入sparkConf變數,isLocal等,SparkEnv其中包含Driver的一個EndPoint,負責快取,map輸出跟蹤等。
    // Create and start the scheduler
    //TODO 傳說中的TaskScheduler建立
    //TODO 先建立TaskScheduler原因:DAGScheduler的構造引數需要taskScheduler成員(taskScheduler完成的initialize方法呼叫)
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
上面程式碼完成_schedulerBackend 和_taskScheduler成員的初始化,createTaskScheduler方法是一個重要方法,在createTaskScheduler方法中根據實際的master url建立具體的_schedulerBackend。例如本地模式的資源管理器LocalSchedulerBackend,standalone模式為 StandaloneSchedulerBackend,yarn模式為YarnSchedulerBackend(spark yarn模組中)。建立完具體SchedulerBackend之後將其設定給taskScheduler,後期taskScheduler提交task時候就是提交給其自身的SchedulerBackend。到此就完成了_schedulerBackend 和_taskScheduler成員的初始化。
//TODO 傳說中的DAGScheduler建立
    _dagScheduler = new DAGScheduler(this)

接下來開始初始化_dagScheduler,傳入當前的sparkContext物件,_dagScheduler持有_taskScheduler引用(因為_dagScheduler劃分完畢stage之後,將stage轉化為taskSet之後需要提交給_taskScheduler )。
 _taskScheduler.start()
啟動_taskScheduler,(在SparkStandalone叢集中)在taskScheduler的start方法重主要是連線叢集,建立StandaloneAppClient,在其Onstart方法中給Master發訊息提交提交作業(Master接收到提交作業之後呼叫master的schedule()的方法進行資源排程)。