1. 程式人生 > >spark2原理分析-TaskScheduler(task排程器)概覽

spark2原理分析-TaskScheduler(task排程器)概覽

概述

本文介紹TaskScheduler的基本概念和總體框架。TaskScheduler負責提交Spark應用的任務(task)去執行。

根據前面的分析,我們已經知道job的提交過程,如下圖所示:

在前面的文章中對stage的排程進行了介紹,現在我們介紹task的排程器:TaskScheduler。

任務(Task)排程概述

在Spark中,不同的部署模式和執行任務排程器也不相同,如下:

部署模式 執行模式 task排程器
預設 預設排程 TaskSchedulerImpl
YARN 客戶端模式 YarnScheduler
YARN 叢集模式 YarnClusterScheduler

根據部署模式的master,通過master來獲取對應的TaskScheduler。
流程如下:

  • 獲取叢集管理器
  • 通過叢集管理器來建立TaskScheduler
  • 通過叢集管理器建立排程器後臺物件SchedulerBackend

TaskScheduler建立和啟動

TaskScheduler負責提交Spark應用的任務(task)去執行。

TaskScheduler的啟動總體流程

從上圖可以看出,TaskScheduler在SparkContext初始化時進行建立。在建立TaskScheduler物件時,還會建立一個後臺的服務實體:LocalSchedulerBackend或StandaloneSchedulerBackend。

當呼叫TaskScheduler的start()函式時,TaskScheduler物件會呼叫SchedulerBackend物件的start()函式。

在SparkContext中啟動TaskScheduler

在SparkContext中建立和啟動TaskScheduler的程式碼流程如下:

  • 通過SparkContext的createTaskScheduler函式來建立TaskScheduler物件
  • 呼叫TaskScheduler的start()函式啟動任務排程器

啟動的程式碼如下:

    // Create and start the scheduler
    // 建立一個TaskScheduler物件
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    
    ...

    // 啟動TaskScheduler
    _taskScheduler.start()

TaskScheduler實現合約

TaskScheduler是一個介面,目前實現該介面的類是TaskSchedulerImpl。我們也可以根據自己的需要來實現自己的任務排程類。

該介面定義的成員如下:

成員名 型別 說明
rootPool Pool 排程實體集
schedulingMode SchedulingMode 排程模式,目前支援兩種:FAIR, FIFO
start 函式 啟動TaskScheduler
postStartHook 函式 TaskScheduler初始化後被呼叫
stop 函式 停止TaskScheduler,切斷和cluster的聯絡
submitTasks 函式 提交給定階段的執行任務(作為TaskSet)。當DAGScheduler提交任務(階段的task)時被執行。
cancelTasks 函式 取消給定階段(stage)的所有任務。
killTaskAttempt 函式 嘗試取消一個任務(task)
setDAGScheduler 函式 設定DAGScheduler排程器,根據叢集的master不同而不同。
defaultParallelism 函式 計算預設的並行度。當SparkContext獲取預設並行度時使用。
executorHeartbeatReceived 函式 監聽來自執行器(executor)的心跳資訊
applicationId 函式 獲取該job的應用程式唯一的id
executorLost 處理executor(執行器)失敗事件
workerRemoved 處理worker移除事件
applicationAttemptId Spark應用id,是一個獨一無二的的標識

總結

本文介紹了Spark的TaskScheduler基本功能和啟動流程。