1. 程式人生 > >深入理解spark-兩種調度模式FIFO,FAIR模式

深入理解spark-兩種調度模式FIFO,FAIR模式

用戶 com 比較 back null ssl max turn initial

前面我們應知道了一個任務提交會由DAG拆分為job,stage,task,最後提交給TaskScheduler,在提交taskscheduler中會根據master初始化taskscheduler和schedulerbackend兩個類,並且初始化一個調度池;

1.調度池比較

根據mode初始化調度池pool

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty 這裏可以看到調度池初始化最小設置為0
rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) } } schedulableBuilder.buildPools() }

FIFO模式

這個會根據spark.scheduler.mode 來設置FIFO or FAIR,默認的是FIFO模式;

FIFO模式什麽都不做,實現默認的schedulerableBUilder方法,建立的調度池也為空,addTasksetmaneger也是調用默認的;

可以簡單的理解為,默認模式FIFO什麽也不做。。

技術分享圖片

FAIR模式

fair模式則重寫了buildpools的方法,讀取默認路徑 $SPARK_HOME/conf/fairscheduler.xml文件,也可以通過參數spark.scheduler.allocation.file設置用戶自定義配置文件。

文件中配置的是

poolname 線程池名

schedulermode 調度模式(FIFO,FAIR僅有兩種)

minshare 初始大小的線程核數

wight 調度池的權重

override def buildPools() {
    var is: Option[InputStream] = None
    try {
      is = Option {
        schedulerAllocFile.map { f =>
          new FileInputStream(f)
        }.getOrElse {
          Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
        }
      }

      is.foreach { i => buildFairSchedulerPool(i) }
    } finally {
      is.foreach(_.close())
    }

    // finally create "default" pool
    buildDefaultPool()
  }

同時也重寫了addtaskmanager方法

override def addTaskSetManager(manager: Schedulable, properties: Properties) {
    var poolName = DEFAULT_POOL_NAME
    var parentPool = rootPool.getSchedulableByName(poolName)
    if (properties != null) {
      poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME)
      parentPool = rootPool.getSchedulableByName(poolName)
      if (parentPool == null) {
        // we will create a new pool that user has configured in app
        // instead of being defined in xml file
        parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
          DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
        rootPool.addSchedulable(parentPool)
        logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
          poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
      }
    }
    parentPool.addSchedulable(manager)
    logInfo("Added task set " + manager.name + " tasks to pool " + poolName)
  }

這一段邏輯中是把配置文件中的pool,或者default pool放入rootPool中,然後把TaskSetManager存入rootPool對應的子pool;

2.調度算法比較

除了初始化的調度池不一致外,其實現的調度算法也不一致

實現的調度池Pool,在內部實現方法中也會根據mode不一致來實現調度的不同

var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
    schedulingMode match {
      case SchedulingMode.FAIR =>
        new FairSchedulingAlgorithm()
      case SchedulingMode.FIFO =>
        new FIFOSchedulingAlgorithm()
    }
  }

FIFO模式

FIFO模式的調度方式很容易理解,比較stageID,誰小誰先執行;

這也很好理解,stageID小的任務一般來說是遞歸的最底層,是最先提交給調度池的;

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
      val stageId1 = s1.stageId
      val stageId2 = s2.stageId
      res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
      true
    } else {
      false
    }
  }
}

FAIR模式

fair模式來說的話,稍微復雜一點;

但是還是比較容易看懂,

1.先比較兩個stage的 runningtask使用的核數,其實也可以理解為task的數量,誰小誰的優先級高;

2.比較兩個stage的 runningtask 權重,誰的權重大誰先執行;

3.如果前面都一直,則比較名字了(字符串比較),誰大誰先執行;

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
  override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0

    if (s1Needy && !s2Needy) {
      return true
    } else if (!s1Needy && s2Needy) {
      return false
    } else if (s1Needy && s2Needy) {
      compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
      compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
    }

    if (compare < 0) {
      true
    } else if (compare > 0) {
      false
    } else {
      s1.name < s2.name
    }
  }

總結:雖然了解一下spark的調度模式,以前在執行中基本都沒啥用到,沒想到spark還有這樣的隱藏功能。。。

深入理解spark-兩種調度模式FIFO,FAIR模式