SparkContext建立初始化完成的主要工作
阿新 • • 發佈:2019-02-12
SparkContext是提交作業到叢集的切入點,所以接下來講解一下SparkContext的初始化過程。
當我們使用spark-submit指令碼提交一個作業之後,流程如下:
提交作業之後(省略SparkSubmit的分析不走) -> 反射建立mainClass -> 初始化SparkContext -> 使用sparkContext建立RDD -> 執行count運算元runJob(runJob是SparkContext中的)方法觸發作業->呼叫sparkContext中dagScheduler的runJob方法->呼叫dagScheduler的submitJob方法完成提交,返回一個waiter物件!
通過上面流程可以知道,我們自己的mainClass是通過反射呼叫的main方法,然後執行mainClass中的我們自己實現的邏輯!SparkContext是一個近2000多行的程式碼類,負責RDD的建立,廣播變數,加速器的呼叫。可見其重要性接!其概括性主要完成如下任務:
主構造器主要完成的事情:1) 建立SparkEnv 2)建立TaskScheduler 3)建立DAGScheduler 4)啟動TaskScheduler
下來正式開始分析SparkContext的初始化過程:
1:SparkContext重要的成員有:
/** * SparkEnv,Holds all the runtime environment objects for a running Spark instance */ private var _env: SparkEnv = _
/**
*_schedulerBackend is a cluster manager ,holded by taskscheduler .
*/
private var _schedulerBackend: SchedulerBackend = _
/**
* taskScheduler,底層的task排程介面
*/
private var _taskScheduler: TaskScheduler = _
/** * dagScheduler主要負責切分stage的,決定每個Task的最佳位置等功能 */ @volatile private var _dagScheduler: DAGScheduler = _
2: 在SparkContext主構造器中重要成員的初始化
// Create the Spark execution environment (cache, map output tracker, etc)
//TODO SparkEnv其實就是Driver的RpcEnv環境,_conf是
_env = createSparkEnv(_conf, isLocal, listenerBus)
傳入sparkConf變數,isLocal等,SparkEnv其中包含Driver的一個EndPoint,負責快取,map輸出跟蹤等。
// Create and start the scheduler
//TODO 傳說中的TaskScheduler建立
//TODO 先建立TaskScheduler原因:DAGScheduler的構造引數需要taskScheduler成員(taskScheduler完成的initialize方法呼叫)
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
上面程式碼完成_schedulerBackend 和_taskScheduler成員的初始化,createTaskScheduler方法是一個重要方法,在createTaskScheduler方法中根據實際的master url建立具體的_schedulerBackend。例如本地模式的資源管理器LocalSchedulerBackend,standalone模式為 StandaloneSchedulerBackend,yarn模式為YarnSchedulerBackend(spark
yarn模組中)。建立完具體SchedulerBackend之後將其設定給taskScheduler,後期taskScheduler提交task時候就是提交給其自身的SchedulerBackend。到此就完成了_schedulerBackend 和_taskScheduler成員的初始化。//TODO 傳說中的DAGScheduler建立
_dagScheduler = new DAGScheduler(this)
接下來開始初始化_dagScheduler,傳入當前的sparkContext物件,_dagScheduler持有_taskScheduler引用(因為_dagScheduler劃分完畢stage之後,將stage轉化為taskSet之後需要提交給_taskScheduler )。
_taskScheduler.start()
啟動_taskScheduler,(在SparkStandalone叢集中)在taskScheduler的start方法重主要是連線叢集,建立StandaloneAppClient,在其Onstart方法中給Master發訊息提交提交作業(Master接收到提交作業之後呼叫master的schedule()的方法進行資源排程)。