1. 程式人生 > >spark2原理分析-Task排程物件Pool原理分析

spark2原理分析-Task排程物件Pool原理分析

概述

本文分析Task排程器的Pool排程物件的實現原理。

通過文章spark2原理分析-Task排程物件實現介面(Schedulable)原理分析
我們知道,任務排程器(TaskScheduler)中的排程物件分為兩類:Pool和TaskSetManager。而這兩類排程物件都實現了介面Schedulable。這篇文章著重講解其中的一類排程物件Pool的實現原理。

在Pool排程物件中實現了兩種排程演算法,本文會詳細講解兩種演算法的實現原理。

Pool介紹

Pool是一個Schedulable實體,它包含TaskSetManagers或其Pools的集合。

Pool宣告

Pool的類宣告如下:

private[spark] class Pool(
    val poolName: String,
    val schedulingMode: SchedulingMode,
    initMinShare: Int,
    initWeight: Int)
  extends Schedulable 

Pool的成員分析

  • parent
 var parent: Pool = null

通過該變數可以把任務形成一個任務樹。

  • schedulableQueue
    宣告如下:
  val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]

儲存可排程物件的同步佇列。

  • schedulableNameToSchedulable

排程物件名稱和排程物件的對映,這樣可以通過排程名稱獲取到對應的排程物件。

  val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
  • checkSpeculatableTasks
override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean

該函式用來檢測是否任務的排程時間已經超過了設定的閾值。

  • taskSetSchedulingAlgorithm
private val taskSetSchedulingAlgorithm: SchedulingAlgorithm

用來儲存排程演算法物件。目前支援兩種排程演算法:FIFO和FAIR。下一節會詳細講解兩種演算法的實現原理。

  • addSchedulable

功能:向排程物件佇列中新增排程物件。

  override def addSchedulable(schedulable: Schedulable) {
    require(schedulable != null)
    schedulableQueue.add(schedulable)
    schedulableNameToSchedulable.put(schedulable.name, schedulable)
    schedulable.parent = this
  }
  • removeSchedulable
  override def removeSchedulable(schedulable: Schedulable) 

和addSchedulable函式相似,該函式只不從同步佇列中刪除排程物件。

  • getSchedulableByName
override def getSchedulableByName(schedulableName: String): Schedulable = {

通過排程物件的name欄位的值來獲取排程物件。可以從同步佇列ConcurrentLinkedQueue中獲取,也可以從schedulableNameToSchedulable對映變數中獲取。

  • getSortedTaskSetQueue
  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]

該函式很重要,通過該函式可以得到按排程演算法排好序的排程物件佇列。

  • 執行任務數的統計
def increaseRunningTasks(taskNum: Int)
def decreaseRunningTasks(taskNum: Int)

這兩個函式用來操作runningTasks變數,統計目前正在執行的任務數。同時也會把parent的任務數進行對應的進行增減操作。

排程演算法

介面

private[spark] trait SchedulingAlgorithm {
  def comparator(s1: Schedulable, s2: Schedulable): Boolean
}

FIFO排程演算法(FIFOSchedulingAlgorithm)

FIFO排程演算法的實現類是:FIFOSchedulingAlgorithm。該類實現了介面SchedulingAlgorithm中的comparator函式。

該函式先比較排程物件s1和s2的優先順序(priority)欄位的值,若相等再比較s1和s2的stageId值的大小。詳細的實現邏輯如下:

FAIR排程演算法(FairSchedulingAlgorithm)

FAIR排程演算法的實現類是:FairSchedulingAlgorithm。它通過minShare,runningTasks和weight的變數值來比較排程的優先順序。

該演算法的流程如下圖所示:

通過上面的流程圖可知:
在對兩個排程實體進行排程優先順序對比時,若第一個排程實體正在執行的任務數小於第一個排程實體的CPU數量,則任務第一個實體優先順序高,否則認為第二個排程實體的優先順序高。即比較以下兩個值s1Needy和s2Needy的大小:

val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2

若兩個實體的執行任務數都小於CPU數,則計算每個排程實體的執行任務數和CPU數的比例,並對這個比例進行比較。若第一個的佔有比率小於第二個,則認為第一個排程實體的優先順序高,否則認為第二個的優先順序高。即比較以下兩個值的大小:

val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)

若這兩個實體的排程比率相等,則計算執行任務數和排程實體的權重(weigth)的比率,若第一個的比率小於第二個,則認為第一個的比第二個優先順序高,否則認為第二個優先順序高。即比較以下兩個值的大小:

val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble

若相等,則直接比較兩個排程實體的name的ascii碼。

總結

本文描述了可排程物件Pool的實現原理。該物件實現了任務排程演算法,目前支援兩種:FIFO和FAIR。通過該物件可以把整個stage的任務形成一顆樹狀的結構。