spark2原理分析-Task排程物件Pool原理分析
概述
本文分析Task排程器的Pool排程物件的實現原理。
通過文章spark2原理分析-Task排程物件實現介面(Schedulable)原理分析
我們知道,任務排程器(TaskScheduler)中的排程物件分為兩類:Pool和TaskSetManager。而這兩類排程物件都實現了介面Schedulable。這篇文章著重講解其中的一類排程物件Pool的實現原理。
在Pool排程物件中實現了兩種排程演算法,本文會詳細講解兩種演算法的實現原理。
Pool介紹
Pool是一個Schedulable實體,它包含TaskSetManagers或其Pools的集合。
Pool宣告
Pool的類宣告如下:
private[spark] class Pool(
val poolName: String,
val schedulingMode: SchedulingMode,
initMinShare: Int,
initWeight: Int)
extends Schedulable
Pool的成員分析
- parent
var parent: Pool = null
通過該變數可以把任務形成一個任務樹。
- schedulableQueue
宣告如下:
val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
儲存可排程物件的同步佇列。
- schedulableNameToSchedulable
排程物件名稱和排程物件的對映,這樣可以通過排程名稱獲取到對應的排程物件。
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
- checkSpeculatableTasks
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean
該函式用來檢測是否任務的排程時間已經超過了設定的閾值。
- taskSetSchedulingAlgorithm
private val taskSetSchedulingAlgorithm: SchedulingAlgorithm
用來儲存排程演算法物件。目前支援兩種排程演算法:FIFO和FAIR。下一節會詳細講解兩種演算法的實現原理。
- addSchedulable
功能:向排程物件佇列中新增排程物件。
override def addSchedulable(schedulable: Schedulable) {
require(schedulable != null)
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
schedulable.parent = this
}
- removeSchedulable
override def removeSchedulable(schedulable: Schedulable)
和addSchedulable函式相似,該函式只不從同步佇列中刪除排程物件。
- getSchedulableByName
override def getSchedulableByName(schedulableName: String): Schedulable = {
通過排程物件的name欄位的值來獲取排程物件。可以從同步佇列ConcurrentLinkedQueue中獲取,也可以從schedulableNameToSchedulable對映變數中獲取。
- getSortedTaskSetQueue
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
該函式很重要,通過該函式可以得到按排程演算法排好序的排程物件佇列。
- 執行任務數的統計
def increaseRunningTasks(taskNum: Int)
def decreaseRunningTasks(taskNum: Int)
這兩個函式用來操作runningTasks變數,統計目前正在執行的任務數。同時也會把parent的任務數進行對應的進行增減操作。
排程演算法
介面
private[spark] trait SchedulingAlgorithm {
def comparator(s1: Schedulable, s2: Schedulable): Boolean
}
FIFO排程演算法(FIFOSchedulingAlgorithm)
FIFO排程演算法的實現類是:FIFOSchedulingAlgorithm。該類實現了介面SchedulingAlgorithm中的comparator函式。
該函式先比較排程物件s1和s2的優先順序(priority)欄位的值,若相等再比較s1和s2的stageId值的大小。詳細的實現邏輯如下:
FAIR排程演算法(FairSchedulingAlgorithm)
FAIR排程演算法的實現類是:FairSchedulingAlgorithm。它通過minShare,runningTasks和weight的變數值來比較排程的優先順序。
該演算法的流程如下圖所示:
通過上面的流程圖可知:
在對兩個排程實體進行排程優先順序對比時,若第一個排程實體正在執行的任務數小於第一個排程實體的CPU數量,則任務第一個實體優先順序高,否則認為第二個排程實體的優先順序高。即比較以下兩個值s1Needy和s2Needy的大小:
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
若兩個實體的執行任務數都小於CPU數,則計算每個排程實體的執行任務數和CPU數的比例,並對這個比例進行比較。若第一個的佔有比率小於第二個,則認為第一個排程實體的優先順序高,否則認為第二個的優先順序高。即比較以下兩個值的大小:
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
若這兩個實體的排程比率相等,則計算執行任務數和排程實體的權重(weigth)的比率,若第一個的比率小於第二個,則認為第一個的比第二個優先順序高,否則認為第二個優先順序高。即比較以下兩個值的大小:
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
若相等,則直接比較兩個排程實體的name的ascii碼。
總結
本文描述了可排程物件Pool的實現原理。該物件實現了任務排程演算法,目前支援兩種:FIFO和FAIR。通過該物件可以把整個stage的任務形成一顆樹狀的結構。