Spark1.6-----原始碼解讀之TaskScheduler
阿新 • • 發佈:2018-12-16
TaskScheduler是SparkContext重要成員之一,負責任務的提交,並且請求叢集管理器對任務排程。他也可以看做任務排程的客戶端。
SparkContext 522行 建立TaskScheduler:
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
SparkContext 2592行 為createTaskScheduler具體實現方法:
private def createTaskScheduler( sc: SparkContext, master: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler)
它會根據不同的master 產生不同的行為本文以Local為例子。它會建立TaskSchedulerImpl 並且建立LocalBackend:
構造程式碼TaskSchedulerImpl 102行:
var dagScheduler: DAGScheduler = null var backend: SchedulerBackend = null val mapOutputTracker = SparkEnv.get.mapOutputTracker var schedulableBuilder: SchedulableBuilder = null var rootPool: Pool = null // default scheduler is FIFO private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO") val schedulingMode: SchedulingMode = try { SchedulingMode.withName(schedulingModeConf.toUpperCase) } catch { case e: java.util.NoSuchElementException => throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf") } // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
解析:(1)獲取配置資訊比如排程模式(FIFO,FAIR)
(2)建立TaskResultGetter 作用是通過執行緒池對Worker上的Executor傳送Task的執行結果進行處理。
TaskScheduleImpl的排程方式有兩種,但任務的最終排程都會落到ScheduleBackend的具體實現。
SparkContext 2603行 建立LoaclBackend:
val backend = new LocalBackend(sc.getConf, scheduler, 1)
LoaclBackend比較注意的方法 123行 :
override def start() {
val rpcEnv = SparkEnv.get.rpcEnv
val executorEndpoint = new LocalEndpoint(rpcEnv, userClassPath, scheduler, this, totalCores)
localEndpoint = rpcEnv.setupEndpoint("LocalBackendEndpoint", executorEndpoint)
listenerBus.post(SparkListenerExecutorAdded(
System.currentTimeMillis,
executorEndpoint.localExecutorId,
new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty)))
launcherBackend.setAppId(appId)
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
解析:它會建立LocalEndpoint,可以看出LoaclBackend會同過LoaclEndpoint來進行訊息的通訊。
TaskSchedulerImpl和LoaclBackEnd建立好了便進行初始化。
SparkContext 2616行 呼叫初始化方法:
scheduler.initialize(backend)
呼叫TaskSchedulerImpl 126行:
def initialize(backend: SchedulerBackend) {
//獲得LoaclBackend引用
this.backend = backend
// temporarily set rootPool name to empty建立快取佇列
rootPool = new Pool("", schedulingMode, 0, 0)
//建立不同的排程策略來操作佇列
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
TaskScheduler建立完畢。