1. 程式人生 > >spark作業排程

spark作業排程

一、排程分類

排程分為兩種,一是應用之間的,二是應用內部作業的。

(一)應用之間

我們前面幾章有說過,一個spark-submit提交的是一個應用,不同的應用之間是有排程的,這個就由資源分配者來排程。如果我們使用Yarn,那麼就由Yarn來排程。排程方式的配置就在$HADOOP_HOME/etc/hadoop/yarn-site.xml

<property>
<name>yarn.resourcemanager.scheduler.class</name> 
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
</property>

(二)應用內部

參考《Spark基礎入門(三)--------作業執行方式》可以看到,SparkContext底層會觸發呼叫runJob的方法阻塞式的提交job,提交job的執行緒會處於阻塞狀態,同一個執行緒中,後面的job需要等待前面job完成才能提交。但當多執行緒執行時,則可以併發提交Job

例如SparkStreaming執行併發提交時,可以看到一個SparkStreaming的專案中多個job在同時跑:

 

再例如Thriftserver,多個使用者通過beeline連線Thriftserver提交自己的查詢,所有的查詢都是並行執行的:

 

我們重點介紹應用內部的排程,排程方式的配置在

$SPAKR_HOME/conf/spark-defaults.conf

spark.scheduler.mode = FIFO/FAIR


二、排程原理

結合《Spark基礎入門(三)--------作業執行方式

(一)作業提交與排程池的建立


1. DAGScheduler採取的生產者消費者模型,存在一個Event佇列,使用者和TaskScheduler會生產event到這個佇列中,DAGScheduler中會有一個Daemon執行緒去消費這些event併產生對應的處理。DAGScheduler可以處理的Event包括:JobSubmittedCompletionEventExecutorLost

TaskFailedStopDAGScheduler

2. DAGScheduler 在接收到JobSubmittedEvent之後,會首先計算出其DAG圖,然後劃分Stage,最後提交TaskSetTaskScheduler(通過呼叫TaskSchedulersubmitTasksTaskScheduler還有cancelTasks的方法)

3. TaskSchedulersubmitTasks方法最後會建立TaskManager的例項,由它去管理裡面的TaskSet

4. SparkContext是多執行緒安全的,可以有多個執行緒提交Job,這個Job也就是sparkAction

5. 每個執行緒提交Job時,是按Stage為最小單位來提交的,提交一個stageTaskSet(一堆task任務)有一個TaskSetManager會來管理TaskSet一個TaskSet對應一個TaskSetManager

6. TaskScheduler在初始化時,會建立一個Pool,用於排程;還會建立SchedulerBuilder,會去構造剛剛這個Pool

7. SchedulerBuilderTaskSchedulerImpl類中的定義如下,SchedulerBuilder會根據使用者設定的排程模式(比如FIFO或者Fair)呼叫其buildPools方法,將下面的TaskSetManager按照一定的組織形式放到Pool中。上圖綠色框圖圈出來的部分。比如使用的FIFO,則以FIFOSchedulableBuilder類來build pool,如果使用FAIR,則使用FairChedulableBuilder

var schedulableBuilder: SchedulableBuilder = null
...
  def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
  schedulingMode match {
    case SchedulingMode.FIFO =>
      //rootPool包含了一組TaskSetManager
      new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          //rootPool包含了一組Pool樹,這棵樹的葉子節點都是TaskSetManager
          new FairSchedulableBuilder(rootPool, conf)
        }
     }
     schedulableBuilder.buildPools() //在FIFO中的實現是空
}

(二)作業排程

上述都是基礎工作,pool和排程物件建立聯絡之後,才開始真正的排程。

1. 排程由TaskScheduler進行,只有在有新的計算資源時才會進行作業排程。

2. TaskScheduler後面還有SchedulerBackendSchedulerBackend會負責與Executor互動。

3. SchedulerBackend會呼叫makeOffers,觸發TaskScheduler呼叫resourceOffers方法。resourceOffers方法會根據當前的設定,選用一個排程演算法,進行作業排程。

var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
  schedulingMode match {
case SchedulingMode.FAIR =>
  new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
  new FIFOSchedulingAlgorithm()
}
}

4. 有兩種觸發SchedulerBackend呼叫makeOffers的場景:

(1) 定時任務:SchedulerBackend在啟動時會建立DriverEndPointDriverEndPoint中有定時任務,一定時間(spark.scheduler.revive.interval,預設為1s)進行一次排程(給自身傳送ReviveOffers訊息, 進行呼叫makeOffers進行排程)

(2)資源釋放:當Executor執行完成已分配任務時,會向Driver傳送StatusUpdate訊息,表明一個Executor資源已經釋放,則呼叫makeOffers(executorId)方法。

三、排程演算法

(一)FIFO(First in first out)

三個執行緒提交三個Job,則按照順序,先執行Job1,執行結束之後再執行Job2,然後再執行Job3

1. buildPools演算法

對於FIFO模式的排程,rootPool管理的直接就是TaskSetManagerSchedulerBuilderbuildPools方法會遍歷所有的TaskSetManager,然後將他們直接掛在rootPool下面。

FIFO排程模式只有一層,會在葉子節點TaskSetManager中選擇排程哪一個

/**FIFO模式下的Pools的構建/ 
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool) 
extends SchedulableBuilder with Logging { 
override def buildPools() { 
// 實際什麼都不做 
} 
//新增下級排程實體的時候,直接新增到rootPool 
override def addTaskSetManager(manager: Schedulable, properties: Properties) { 
rootPool.addSchedulable(manager) 
} 
}

2. 排程演算法

/**
 * FIFO排序的實現,主要因素是優先順序、其次是對應的Stage
 * 優先順序高的在前面,優先順序相同,則靠前的stage優先
 */
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    //優先順序越小優先順序越高
    val priority1 = s1.priority
    val priority2 = s2.priority
    var res = math.signum(priority1 - priority2)
    if (res == 0) {
        //如果優先順序相同,那麼Stage靠前的優先
        val stageId1 = s1.stageId
        val stageId2 = s2.stageId
        res = math.signum(stageId1 - stageId2)
    }
    if (res < 0) {
        true
    } else {
       false
    }
 }
}

首先比較優先順序

如果優先順序相同,就比較Stage的大小。

FIFO中,優先順序即是JobID。而JobID是順序生成的,所以也就是先生成的JobID比較小,參考程式碼可以看到優先順序JobID)越小,越先排程

對同一個作業(Job)來說越先生成的Stage,其StageId越小,

有依賴關係的多個Stage之間,DAGScheduler會控制Stage是否會被提交到排程佇列中(若其依賴的Stage未執行完前,此Stage不會被提交),其排程順序可通過此來保證。但若某Job中有兩個無入度的Stage的話,則先排程StageId小的Stage。

3. 實驗

 

這個演算法對外表現出來就是一個Job1先執行完了之後下一個Job2,那麼如果Job1執行需要3個小時,而Job2執行只需要1分鐘,結果Job2從提交到結束會需要3小時一分鐘。非常不友好、不靈活。

(二)FAIR

首先配置$SPAKR_HOME/conf/spark-defaults.conf

spark.scheduler.mode    =  FAIR

1. buildPools演算法

/**FAIR模式下的Pools的構建*/
private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
    extends SchedulableBuilder with Logging {
    ....省略程式碼...
     override def buildPools() {
         ...省略...
        buildDefaultPool()
     }
 
     private def buildDefaultPool() {
        if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
        val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
            DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
        rootPool.addSchedulable(pool)
        ......
 }
}

 

模型如上,Fair模型的排程是兩級排程。rootPool下面管理的是其他pool,下面的pool才去管理TaskManager

配置方式:

1)新增池子

新增$SPAKR_HOME/conf/fairscheduler.xml可以設定排程的多個池子,如果不設定,則預設底下只有一個defaultPool池子。

<?xml version="1.0"?>
<allocations>
  <pool name="default">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>0</minShare>
  </pool>
  <pool name="pool1">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>0</minShare>
  </pool>
</allocations>

2)配置TaskSetManager與池子之間的關係

執行緒1提交了一個action,這個action觸發了一個jobId1job,會交給TaskSetManager1來管理。

在提交這個action之前,程式碼中設定spark.scheduler.pool:

SparkContext.setLocalProperty(“spark.scheduler.pool”,”pool_name_1”)

注意這裡的setLocalProperty,筆者認為應該是執行緒私有的物件

如果不加設定,jobs會提交到default排程池中。由於排程池的使用是Thread級別的,只能通過具體的SparkContext設定local屬性(即無法在配置檔案中通過引數spark.scheduler.pool來設定,因為配置檔案中的引數會被載入到SparkConf物件中)。所以需要使用指定排程池的話,需要在具體程式碼中通過SparkContext物件sc來按照如下方法進行設定:
sc.setLocalProperty("spark.scheduler.pool", "test") 
設定該引數後,在該thread中提交的所有job都會提交到test Pool中。
如果接下來不再需要使用到該test排程池,
sc.setLocalProperty("spark.scheduler.pool", null)

我們將不同執行緒提交的job給隔離到不同的池子裡了

2. 排程演算法

private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
    //最小共享,可以理解為執行需要的最小資源即CPU核數
    val minShare1 = s1.minShare
    val minShare2 = s2.minShare
    //執行的任務的數量
    val runningTasks1 = s1.runningTasks
    val runningTasks2 = s2.runningTasks
//執行中的任務的數量與最小CPU核數比較,如果小於,則說明處於飢餓狀態
    val s1Needy = runningTasks1 < minShare1
    val s2Needy = runningTasks2 < minShare2
 
     //飢餓程度越大(runningTask遠小於minshare),算出來的數值越小
    val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble
    val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble
 
     //權重越高,算出來的數值越小
    val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
    val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
    var compare: Int = 0
 
     //飢餓的優先
    if (s1Needy && !s2Needy) {
        return true
    } else if (!s1Needy && s2Needy) {
        return false
    } else if (s1Needy && s2Needy) {
        //都處於捱餓狀態則飢餓程度越大的優先
        compare = minShareRatio1.compareTo(minShareRatio2)
    } else {
        //都不捱餓,則權重高的優先
        compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
   }
 
  if (compare < 0) {//小於0時,返回true
        true
   } else if (compare > 0) {//大於0時,返回false
    false
   } else {
  //如果都一樣,那麼比較名字,按照字母順序比較,不考慮長度,所以名字比較重要
    s1.name < s2.name
  }
 }
}

上述演算法總結下來就是:

1.飢餓的優先(minShare)

2.都處於捱餓狀態則飢餓程度越大的優先(running/minShare越小的優先)

3.都不捱餓,則權重程度高的優先(running/weight越小的優先)

4.算出來的值相同時,則比較名字(按照字母順序比較)

3. 案例分析

20核分配

三個池子hello(minshare:5/weight:15), apple(minshare:2/weight:5), pool(minshare:3/weight:1)

初始狀態:0<5                      0<2                      0<3

全部飢餓

飢餓程度  0%                       0%                       0%

按名字分配                          1

飢餓程度 0%                        1/2(50%)                  0%

按名字                                                      1

飢餓程度 0%                        1/2(50%)                  1/3(33.3%)

按飢餓程度1

飢餓程度  1/5(20%)                  1/2(50%)                  1/3(33.3%)

按飢餓程度1

飢餓程度  2/5(40%)                  1/2(50%)                  1/3(33.3%)

按飢餓程度2/5(40%)                  1/2(50%)                  2/3(66.7%)

按飢餓程度3/5(60%)                 1/2(50%)                  2/3(66.7%)

按飢餓程度3/5(60%)                 2/2(100%)                  2/3(66.7%)

按飢餓程度4/5(80%)                  2/2(100%)                  2/3(66.7%)

按飢餓程度4/5(80%)                  2/2(100%)                  3/3(100%)

按飢餓     5/5(100%)                 2/2(100%)                  3/3(100%)

此時已經分配10個核

全部不飢餓,權重程度 5/15(33.3%)     2/5(40%)                     3/1(300%)

按權重程度6/15(40%)                 2/5(40%)                     3/1(300%)

按名字     6/15(40%)                 3/5(60%)                     3/1(300%)

按權重程度 7/15(46.7%)                3/5(60%)                     3/1(300%)

按權重程度 8/15(53.3%)                3/5(60%)                     3/1(300%)

按權重程度 9/15(60%)                 3/5(60%)                     3/1(300%)

按名字     9/15(60%)                 4/5(80%)                     3/1(300%)

按權重程度 10/15(66.7%)               4/5(80%)                     3/1(300%)

按權重程度 11/15(73.3%)               4/5(80%)                     3/1(300%)

按權重程度 12/15(80%)                4/5(80%)                     3/1(300%)

按名字     12/15(80%)                5/5(100%)                     3/1(300%)

此時20核全部分配完

如果使用者配置一個指定排程池權重為2, 那麼這個排程池將會獲得相對於權重為1的排程池2倍的資源

4. 池子內部的排程

第一小層是Pool(資源池)間的公平排程,第二小層是Pool內的。注意,Pool內部排程預設是FIFO的,需要設定{spark_base_dir}/conf/fairscheduler.xml,針對具體的Pool設定排程規則

<pool name="default">

    <schedulingMode>FAIR</schedulingMode>

    <weight>1</weight>

    <minShare>0</minShare>

</pool>

pool內已經沒有minShareweight了,所以筆者猜測poolminShare全是0weight全是1。然後也就是公平的平均分配所有的資源。

四、TaskSetManager內部排程

當資源已經分配給一個taskSetManager之後,再就是執行任務內部的排程邏輯。因為分配的資源是某個executor上的,每個Task又有自己prefer的節點(為了計算的本地性),他們之間可能不是完全的匹配。

例如資源executor(機器c上的)排程給了一個taskSetManager,而taskSetManager中此時只有a,b兩個task(它們prefer的節點是a,b),那麼如果此時將c資源給a task,那麼a可能計算就是rack(機架中的),然後很短時間內,又有一個a資源排程過來,而此時只能把它給b task。而實際上最佳的方式應該是把a資源給a taskc資源給b task

所以這裡有一個等待機制,包括以下引數:spark.locality.wait.processspark.locality.wait.nodespark.locality.wait.rackTaskSetManager會根據等待時間降低自己的要求。(從process本地程序---->node本地節點---->rack同機架上---->any任意匹配)。這種等待機制會帶來一定延遲,但如果這種調整有效那麼也會節約很多計算時間(比如上例中,最後a上計算a task會比c上計算a task快很多)。

五、Thriftserver的排程

想要thriftserver達到SQL級別的公平排程,需要配置三個配置檔案:yarn-site.xmlspark-defaults.conffairscheduler.xml。由於thriftserverSQL沒有按照不同使用者區分多個Pool,所以其實並不能實現不同權重和minshare的設定,只能達到完全公平的分配(也就是(三)4)中提到的池子內的排程)。

但通過修改thriftserver的原始碼,可以實現不同sql分配到不同的池子裡面,就可以實現sql級別的排程了。但池子必須提前配置好。