1. 程式人生 > 實用技巧 >【Yarn原始碼分析】FairScheduler資源排程

【Yarn原始碼分析】FairScheduler資源排程

一、Yarn 資源排程方式

資源排程方式確定了當任務提交到叢集,如何為其分配資源執行任務。在 FairScheduler 中提供了兩種資源排程方式:心跳排程和連續排程。

  • 心跳排程方式:NodeManager 向 ResourceManager 彙報了自身資源情況(比如,當前可用資源,正在使用的資源,已經釋放的資源),這個 RPC 會觸發 ResourceManager 呼叫 nodeUpdate() 方法,這個方法為這個節點進行一次資源排程,即,從維護的 Queue 中取出合適的應用的資源請求(合適 ,指的是這個資源請求既不違背佇列的最大資源使用限制,也不違背這個 NodeManager 的剩餘資源量限制)放到這個NodeManager上執行。這種排程方式一個主要缺點就是排程緩慢,當一個NodeManager即使已經有了剩餘資源,排程也只能在心跳傳送以後才會進行,不夠及時。
  • 連續排程方式:由一個獨立的執行緒 ContinuousSchedulingThread 負責進行持續的資源排程,與 NodeManager 的心跳是非同步進行的。即不需要等到 NodeManager 發來心跳才開始資源排程。

無論是 NodeManager 心跳時觸發排程,還是通過 ContinuousSchedulingThread 進行實時、持續觸發,他們對某個節點進行一次排程的演算法和原理是公用的,都是通過 synchronized void attemptScheduling(FSSchedulerNode node) 來在某個節點上進行一次排程,方法的的引數代表了準備進行資源分配的節點。兩種觸發機制不同的地方只有兩個:

  • 排程時機:心跳排程僅僅發生在收到了某個 NodeManager 的心跳資訊的情況下,持續排程則不依賴與NodeManager的心跳通訊,是連續發生的,當心跳到來,會將排程結果直接返回給 NodeManager;
  • 排程範圍:心跳排程機制下,當收到某個節點的心跳,就對這個節點且僅僅對這個節點進行一次排程,即誰的心跳到來就觸發對誰的排程,而持續排程的每一輪,是會遍歷當前叢集的所有節點,每個節點依次進行一次排程,保證一輪下來每一個節點都被公平的排程一次;

在叢集環境中,連續排程預設不開啟,只有設定 yarn.scheduler.fair.continuous-scheduling-enabled 引數為 true,才會啟動該執行緒。連續排程現在已經不推薦了,因為它會因為鎖的問題,而導致資源排程變得緩慢。可以使用 yarn.scheduler.assignmultiple 引數啟動批量分配功能,作為連續排程的替代。

二、Yarn 排程流程

本文的排程流程主要介紹心跳排程的方式,下圖是 Yarn 心跳排程的主要流程。

Yarn 排程流程圖

2.1 名詞解釋

  • ResrouceSacheduler 是YARN 的排程器,負責 Container 的分配。下面主要是以 FairScheduler 排程器為例。
  • AsyncDispatcher 是單執行緒的事件分發器,負責向排程器傳送排程事件。
  • ResourceTrackerService 是資源跟蹤器,主要負責接收處理 NodeManager 的心跳資訊。
  • ApplicationMasterService 是作業的 RPC 服務,主要負責接收處理作業的心跳資訊。
  • AppMaster 是作業的程式控制器,負責跟 YARN 互動獲取/釋放資源。

2.2 排程流程

YARN 的資源排程是非同步進行的,NM 心跳發生時,排程器 ResourceScheduler 根據作業需求將 Container 資源分配給作業後,不會立即通知 AM,而是等待 AM 註冊後通過心跳方式來主動獲取。YARN 的整個排程流程可以概括為以下幾個步驟:

  1. NM 節點通過心跳方式向 RM 彙報節點資源資訊(包括當前可用資源、正在使用的資源、已經釋放的資源)。
  2. ResourceTrackerService 服務收到 NM 的心跳事件,將 NODE_UPDATE 事件交給 中央排程器 AsyncDispatcher 處理;
  3. AsyncDispatcher 根據事件型別將請求轉發給 ResourceScheduler 處理,ResourceScheduler 則按照一定的排程策略(佇列層級排程)將 NM 的資源分配到 Container,並將 Container 儲存在資料結構中;
  4. ResourceScheduler 針對作業分配的第一個 Container 用於啟動作業的 AM 程序;
  5. AM 啟動後,通過 ApplicationMasterService 定期向 RM 發生資源請求心跳,領取之前記錄在 RM 中分配給自己的 Container 資源;
  6. AM 向 NM 傳送啟動 Container 的命令,將收到的 Container 在 NM 上啟動執行。

其中,Yarn 分配策略確定了在 NM 發生心跳時,如何在所有佇列中選擇合適的 APP 資源請求以為其分配資源。從上圖佇列層級結構可以看出一次 Container 的分配流程:每次分配從 root 節點開始,先從佇列中選擇合適的葉子佇列,然後從佇列的 APP 中選擇合適的 APP,最後選擇出該 APP 中合適的 Container 為其分配資源執行任務,選擇過程如下:

  • 選擇佇列 (排序)。從根佇列開始,使用深度優先遍歷演算法,從根佇列開始,依次遍歷子佇列找出資源佔用率最小的子佇列。若子佇列為葉子佇列,則選擇該佇列;若子佇列為非葉子佇列,則以該子佇列為根佇列重複前面的過程直到找到一個資源使用率最小的葉子佇列為止。
  • 選擇應用(排序)。在Step1中選好了葉子佇列後,取該佇列中排序最靠前的應用程式(排序邏輯可以根據應用程式的資源請求量、提交時間、作業名)。
  • 選擇 Container(排序)。在 Step2中選好應用程式之後,選擇該應用程式中優先順序最高的 Container。對於優先順序相同的 Containers,優選選擇滿足本地性的 Container,會依次選擇 node local、rack local、no local。

三、FairScheduler 資源分配原始碼分析

3.1 NM 心跳上報

NM 中負責心跳的類是NodeStatusUpdater 型別的成員變數 nodeStatusUpdater,在 NM 呼叫 serviceInit() 方法時被建立:

//程式碼:org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  protected void serviceInit(Configuration conf) throws Exception {
    ...  // 省略
    nodeStatusUpdater =
        createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
    ...  // 省略
  }

  protected NodeStatusUpdater createNodeStatusUpdater(Context context,
      Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
    return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
      metrics);
  }

nodeStatusUpdater 在建立時初始化例項 NodeStatusUpdaterImpl,它是真正負責與 RM 通訊的類,其中 serviceStart() 方法中會進行 NM 註冊和心跳。

//位置:org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  @Override
  protected void serviceStart() throws Exception {

    // NodeManager is the last service to start, so NodeId is available.
    this.nodeId = this.context.getNodeId();
    this.httpPort = this.context.getHttpPort();
    this.nodeManagerVersionId = YarnVersionInfo.getVersion();
    try {
      // Registration has to be in start so that ContainerManager can get the
      // perNM tokens needed to authenticate ContainerTokens.
      this.resourceTracker = getRMClient();
      registerWithRM();         // NM向RM註冊
      super.serviceStart();
      startStatusUpdater();     // 獨立執行緒進行NM心跳上報
    } catch (Exception e) {
      String errorMessage = "Unexpected error starting NodeStatusUpdater";
      LOG.error(errorMessage, e);
      throw new YarnRuntimeException(e);
    }
  }

NM 向 RM 註冊邏輯直接跳過,重點看一下心跳邏輯,首先啟動心跳執行緒:

//位置:org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  protected void startStatusUpdater() {
    statusUpdaterRunnable = new Runnable() { ... };
    statusUpdater =
        new Thread(statusUpdaterRunnable, "Node Status Updater");
    statusUpdater.start();
  }

接著來看看statusUpdaterRunnable 執行緒如何進行心跳上報:

//位置:org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    statusUpdaterRunnable = new Runnable() {
      @Override
      @SuppressWarnings("unchecked")
      public void run() {
        int lastHeartBeatID = 0;
        while (!isStopped) {    // 在被終止前死迴圈的跑
          // Send heartbeat
          try {
            NodeHeartbeatResponse response = null;
            NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
            
            // 構建 request 請求資訊
            NodeHeartbeatRequest request =
                NodeHeartbeatRequest.newInstance(nodeStatus,
                  NodeStatusUpdaterImpl.this.context
                    .getContainerTokenSecretManager().getCurrentKey(),
                  NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
                    .getCurrentKey());

            // 重點:這裡向 RM 傳送心跳 RPC 請求,並得到返回結果 response
            response = resourceTracker.nodeHeartbeat(request);
            //get next heartbeat interval from response
            nextHeartBeatInterval = response.getNextHeartBeatInterval();
            updateMasterKeys(response);

            if (response.getNodeAction() == NodeAction.SHUTDOWN) {
              // 處理 RM 返回的結果,包括停止執行和重新註冊
            }
            ...  // 省略
          }
        }
      }
    };

至此,NM 已經通過NodeStatusUpdaterImpl 類向 RM 傳送了心跳請求,那 RM 又如何處理該心跳請求呢?我們接著分析。

3.2 RM 接收心跳

NM 與 RM 通訊的介面是通過ResourceTrackerService 服務來實現。直接來看看 NM 呼叫 nodeHeartbeat() 方法傳送過來的請求。

//位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  @Override
  public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
      throws YarnException, IOException {

    NodeStatus remoteNodeStatus = request.getNodeStatus();
    /**
     * 處理心跳的流程:
     * 1. 判斷是否是合法的 node(即是否被拉黑過)
     * 2. 判斷是否是一個註冊過的 node
     * 3. 判斷這個心跳是否是重複的心跳
     * 4. 傳送 NM 的狀態給 RMNodeStatusEvent 事件處理器
     */

    ... // 1-3 步跳過

    // 4. Send status to RMNode, saving the latest response.
    RMNodeStatusEvent nodeStatusEvent =
        new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
          remoteNodeStatus.getContainersStatuses(),
          remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse);
    if (request.getLogAggregationReportsForApps() != null
        && !request.getLogAggregationReportsForApps().isEmpty()) {
      nodeStatusEvent.setLogAggregationReportsForApps(request
        .getLogAggregationReportsForApps());
    }
    this.rmContext.getDispatcher().getEventHandler().handle(nodeStatusEvent);

    return nodeHeartBeatResponse;
  }

心跳處理過程的關鍵是第 4 步,它會通過中央排程器 AsyncDispatcher向 RM 傳送 RMNodeStatusEvent 事件,那這個事件是由誰來處理的呢?在 Yarn 這種事件處理邏輯很常見,關鍵點是要看事件對應的處理器是如何註冊的。上面的RMNodeStatusEvent 事件處理器繼承自 RMNodeEvent,在 RM 的註冊處理器程式碼如下。

//位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
      // Register event handler for RmNodes
      rmDispatcher.register(
          RMNodeEventType.class, new NodeEventDispatcher(rmContext));

其中RMNodeStatusEvent 事件是交由 NodeEventDispatcher 排程器處理,處理的 handle() 方法如下:

//位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    public void handle(RMNodeEvent event) {
      NodeId nodeId = event.getNodeId();
      RMNode node = this.rmContext.getRMNodes().get(nodeId);
      if (node != null) {
        try {
          ((EventHandler<RMNodeEvent>) node).handle(event);
        } catch (Throwable t) {
          LOG.error("Error in handling event type " + event.getType()
              + " for node " + nodeId, t);
        }
      }
    }

這裡會呼叫 RMNode 的handle() 方法,RMNode 是一個介面類,實現類為 RMNodeImpl,對應 handle() 方法如下:

//位置:org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
public void handle(RMNodeEvent event) {
  LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
  try {
    writeLock.lock();
    NodeState oldState = getState();
    try {
       stateMachine.doTransition(event.getType(), event);
    } catch (InvalidStateTransitonException e) {
      LOG.error("Can't handle this event at current state", e);
      LOG.error("Invalid event " + event.getType() + 
          " on Node  " + this.nodeId);
    }
    if (oldState != getState()) {
      LOG.info(nodeId + " Node Transitioned from " + oldState + " to "
               + getState());
    }
  }
  
  finally {
    writeLock.unlock();
  }
}

這裡就涉及到 RMNodeImpl 的狀態機,由於 RMNodeStatusEvent 事件型別是RMNodeEventType.STATUS_UPDATE,狀態機中對這個事件的處理有三種情況:

  • 從 RUNNING 到 RUNNING、UNHEALTHY,呼叫StatusUpdateWhenHealthyTransition;
  • 從 DECOMMISSIONING 到 DECOMMISSIONING、DECOMMISSIONED,呼叫 StatusUpdateWhenHealthyTransition;
  • 從 UNHEALTHY 到 UNHEALTHY、RUNNING,呼叫 StatusUpdateWhenUnHealthyTransition;
//位置:org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
      //Transitions from RUNNING state
      .addTransition(NodeState.RUNNING,
          EnumSet.of(NodeState.RUNNING, NodeState.UNHEALTHY),
          RMNodeEventType.STATUS_UPDATE,
          new StatusUpdateWhenHealthyTransition())
      //Transitions from DECOMMISSIONING state
      .addTransition(NodeState.DECOMMISSIONING,
          EnumSet.of(NodeState.DECOMMISSIONING, NodeState.DECOMMISSIONED),
          RMNodeEventType.STATUS_UPDATE,
          new StatusUpdateWhenHealthyTransition())
      //Transitions from UNHEALTHY state
      .addTransition(NodeState.UNHEALTHY,
          EnumSet.of(NodeState.UNHEALTHY, NodeState.RUNNING),
          RMNodeEventType.STATUS_UPDATE,
          new StatusUpdateWhenUnHealthyTransition())

這裡選擇最常見的狀態轉換,從 RUNNING 到 RUNNING,檢視被呼叫的StatusUpdateWhenHealthyTransition 狀態機的 transition() 方法:

//位置:org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
    public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {

      RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;

      // Switch the last heartbeatresponse.
      rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();

      NodeHealthStatus remoteNodeHealthStatus =
          statusEvent.getNodeHealthStatus();
      rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
      rmNode.setLastHealthReportTime(
          remoteNodeHealthStatus.getLastHealthReportTime());
      NodeState initialState = rmNode.getState();
      boolean isNodeDecommissioning =
          initialState.equals(NodeState.DECOMMISSIONING);
      
      ... // 跳過 unhealthy 和 decommsission 的判斷邏輯

      rmNode.handleContainerStatus(statusEvent.getContainers());

      List<LogAggregationReport> logAggregationReportsForApps =
          statusEvent.getLogAggregationReportsForApps();
      if (logAggregationReportsForApps != null
          && !logAggregationReportsForApps.isEmpty()) {
        rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
      }

      if(rmNode.nextHeartBeat) {
        rmNode.nextHeartBeat = false;
        // 重點:向 RM 傳送一個 NodeUpdateSchedulerEvent 事件
        rmNode.context.getDispatcher().getEventHandler().handle(
            new NodeUpdateSchedulerEvent(rmNode));
      }

      // Update DTRenewer in secure mode to keep these apps alive. Today this is
      // needed for log-aggregation to finish long after the apps are gone.
      if (UserGroupInformation.isSecurityEnabled()) {
        rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
          statusEvent.getKeepAliveAppIds());
      }

      return initialState;
    }
  }

這裡關鍵的邏輯是向 RM 傳送了一個NodeUpdateSchedulerEvent 事件,那這個事件又是誰處理的呢?NodeUpdateSchedulerEvent 繼承自SchedulerEvent,SchedulerEvent在RM中註冊的處理器如下:

//位置:org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  // 註冊 SchedulerEventDispatcher
  schedulerDispatcher = createSchedulerEventDispatcher();
  addIfService(schedulerDispatcher);
  rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
  
  // 註冊方法
  protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
    return new SchedulerEventDispatcher(this.scheduler);
  }

其中 scheduler 物件是根據配置 yarn.resourcemanager.yarn.resourcemanager 指定的類生成的物件,這裡使用 org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler。那就進入到 FairScheduler 的 handle() 方法,這裡只看 NODE_UPDATE 事件的處理邏輯,其他的先省略。

//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  @Override
  public void handle(SchedulerEvent event) {
    long start = getClock().getTime();
    switch (event.getType()) {
    case NODE_ADDED:  // 省略
    case NODE_REMOVED: // 省略
    case NODE_UPDATE:
      if (!(event instanceof NodeUpdateSchedulerEvent)) {
        throw new RuntimeException("Unexpected event type: " + event);
      }
      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
      nodeUpdate(nodeUpdatedEvent.getRMNode());
      fsOpDurations.addHandleNodeUpdateEventDuration(getClock().getTime() - start);
      break;
    case APP_ADDED: // 省略
    case APP_REMOVED: // 省略
    case NODE_RESOURCE_UPDATE: // 省略
    case APP_ATTEMPT_ADDED: // 省略
    case APP_ATTEMPT_REMOVED: // 省略
    case CONTAINER_EXPIRED: // 省略
    case CONTAINER_RESCHEDULED: // 省略
    default:
      LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
    }
  }

由於 NM 的心跳事件是RMNodeEventType.STATUS_UPDATE,可以得知這裡處理的事件型別為SchedulerEventType.NODE_UPDATE,進入NODE_UPDATE處理邏輯。

//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
private void nodeUpdate(RMNode nm) {
    try {
      writeLock.lock();
      long start = getClock().getTime();
      if (LOG.isDebugEnabled()) {
        LOG.debug("nodeUpdate: " + nm +
            " cluster capacity: " + getClusterResource());
      }
      eventLog.log("HEARTBEAT", nm.getHostName());
      FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());

      // Containe 狀態更新:處理新執行或者執行完成的 Container

      // 判斷 NM 是否是 DECOMMISSIONING 狀態

      // 核心排程入口,無論是否開啟連續排程入口都是 attemptScheduling(node) 方法
      if (continuousSchedulingEnabled) {
        if (!completedContainers.isEmpty()) {
          attemptScheduling(node);
        }
      } else {
        attemptScheduling(node);
      }

      long duration = getClock().getTime() - start;
      fsOpDurations.addNodeUpdateDuration(duration);
    } finally {
      writeLock.unlock();
    }
  }

看看核心排程入口,這裡獲取了一個FSSchedulerNode 例項,並嘗試進行排程。

//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  synchronized void attemptScheduling(FSSchedulerNode node) {
    // 檢查是否是有效的 node

    // Assign new containers...
    // 1. 檢查是否有資源預留的應用
    // 2. 沒有預留則進行排程分配新的 Container

    boolean validReservation = false;
    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
    if (reservedAppSchedulable != null) {
      validReservation = reservedAppSchedulable.assignReservedContainer(node);
    }
    if (!validReservation) {
      // No reservation, schedule at queue which is farthest below fair share
      int assignedContainers = 0;
      Resource assignedResource = Resources.clone(Resources.none());
      Resource maxResourcesToAssign =
          Resources.multiply(node.getAvailableResource(), 0.5f);
      while (node.getReservedContainer() == null) {
        boolean assignedContainer = false;
        // 重點:核心分配邏輯開始,從 ROOT 佇列開始排程
        Resource assignment = queueMgr.getRootQueue().assignContainer(node);
        if (!assignment.equals(Resources.none())) {
          assignedContainers++;
          assignedContainer = true;
          Resources.addTo(assignedResource, assignment);
        }
        if (!assignedContainer) { break; }
        if (!shouldContinueAssigning(assignedContainers,
            maxResourcesToAssign, assignedResource)) {
          break;
        }
      }
    }
    updateRootQueueMetrics();
  }

分配 Container 是從 ROOT 佇列開始,這裡呼叫 queueMgr.getRootQueue() 方法找到 ROOT 佇列,然後呼叫 assignContainer(node) 方法。

//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
  @Override
  public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = Resources.none();

    // 如果超過了佇列的 maxShare 則直接返回
    if (!assignContainerPreCheck(node)) {
      return assigned;
    }

    TreeSet<FSQueue> sortedChildQueues = new TreeSet<>(policy.getComparator());

    /*
     * 這裡對所有葉子佇列進行排序,有兩個情況需要考慮下:
     * 1. 新增加一個 queue,不影響結果的正確性,下次會處理新 queue
     * 2. 刪除一個 queue,最好處理一下以不對該 queue 進行分配,不過目前沒有處理,也沒有影響
     */
    readLock.lock();
    try {
        sortedChildQueues.addAll(childQueues);
        for (FSQueue child : sortedChildQueues) {
        assigned = child.assignContainer(node);
        if (!Resources.equals(assigned, Resources.none())) {
          break;
        }
      }
    } finally {
      readLock.unlock();
    }
    return assigned;
  }

這裡是 FSParentQueue 父佇列的 assignContainer() 邏輯,對所有孩子節點進行遍歷,遞迴呼叫該該方法,呼叫過程有兩種情況:

  • 如果孩子節點是 FSParentQueue 父佇列,則遞迴進入FSParentQueue 類相同的邏輯中。
  • 如果孩子節點是 FSLeafQueue 葉子佇列,則進入到下一步的呼叫邏輯。
//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
  @Override
  public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = none();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
          getName() + " fairShare: " + getFairShare());
    }

    // 檢查是否超過佇列的 maxShare 限制
    if (!assignContainerPreCheck(node)) {
      return assigned;
    }

    // 遍歷葉子節點所有有資源需求的 APP,並對其嘗試分配 Container
    for (FSAppAttempt sched : fetchAppsWithDemand(true)) {
      if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
        continue;
      }
      assigned = sched.assignContainer(node);
      if (!assigned.equals(none())) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Assigned container in queue:" + getName() + " " +
              "container:" + assigned);
        }
        break;
      }
    }
    return assigned;
  }

這裡獲取葉子節點的 APP 呼叫了 fetchAppsWithDemand() 方法,該方法主要是對該佇列所有 APP 進行遍歷,找到真正有資源需求的 APP,過濾掉沒有資源的 APP。

//位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
  private TreeSet<FSAppAttempt> fetchAppsWithDemand(boolean assignment) {
    TreeSet<FSAppAttempt> pendingForResourceApps =
        new TreeSet<>(policy.getComparator());
    readLock.lock();
    try {
      for (FSAppAttempt app : runnableApps) {
        // 判斷 APP 是否有資源需求,即有資源還沒有得到滿足
        if (!Resources.isNone(app.getPendingDemand()) &&
            (assignment || app.shouldCheckForStarvation())) {
          pendingForResourceApps.add(app);
        }
      }
    } finally {
      readLock.unlock();
    }
    return pendingForResourceApps;
  }

獲取到葉子節點有資源需求的 APP 後,呼叫FSAppAttempt 類的例項 assignContainer(node) 方法,進行接下來的分配邏輯。

// 位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  @Override
  public Resource assignContainer(FSSchedulerNode node) {
    // 這裡主要檢查佇列已使用資源是否達到了用於執行 AM 的資源限制
    if (isOverAMShareLimit()) {
      List<ResourceRequest> ask = appSchedulingInfo.getAllResourceRequests();
      Resource amResourceRequest = Resources.none();
      if (!ask.isEmpty()) {
        amResourceRequest = ask.get(0).getCapability();
      }
      if (LOG.isDebugEnabled()) {
        LOG.debug("AM resource request: " + amResourceRequest
            + " exceeds maximum AM resource allowed, "
            + getQueue().dumpState());
      }

      return Resources.none();
    }
    return assignContainer(node, false);
  }

這裡主要檢查佇列已使用資源是否達到了用於執行 AM 的資源限制,如果沒有的話,則繼續排程。

// 位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  private Resource assignContainer(FSSchedulerNode node, boolean reserved) {
    if (LOG.isTraceEnabled()) {
      LOG.trace("Node offered to app: " + getName() + " reserved: " + reserved);
    }

    // 對 APP 的所有 ResourceRequest 按照 priority 排序
    Collection<Priority> prioritiesToTry = (reserved) ?
        Arrays.asList(node.getReservedContainer().getReservedPriority()) :
        getPriorities();

    // For each priority, see if we can schedule a node local, rack local
    // or off-switch request. Rack of off-switch requests may be delayed
    // (not scheduled) in order to promote better locality.
    synchronized (this) {
      // 按照 priority 從高到低遍歷所有 ResourceRequest
      for (Priority priority : prioritiesToTry) {
        // 判斷該 Container 是否有預留
        // hasContainerForNode() 會分 node、rack、any 三種情況考慮該節點是否有合適的 Container
        if (!reserved && !hasContainerForNode(priority, node)) {
          continue;
        }

        // 排程機會計數加 1
        addSchedulingOpportunity(priority);

        // 下面的邏輯主要根據 NODE_LOCAL、RACK_LOCAL、OFF_SWITCH 三種情況判斷該 ResourceRequest 滿足哪一種排程方式
        ResourceRequest rackLocalRequest = getResourceRequest(priority,
            node.getRackName());
        ResourceRequest localRequest = getResourceRequest(priority,
            node.getNodeName());

        if (localRequest != null && !localRequest.getRelaxLocality()) {
          LOG.warn("Relax locality off is not supported on local request: "
              + localRequest);
        }

        // 省略三種情況的具體選擇邏輯
      }
    }
    return Resources.none();
  }

上面這段程式碼,主要是按照 priority 從高到低的順序遍歷所有的ResourceRequest,針對每個ResourceRequest,在待分配的 node 節點上,根據 NODE_LOCAL、RACK_LOCAL、OFF_SWITCH 三種情況判斷該 ResourceRequest 滿足哪一種排程方式,這裡以NODE_LOCAL 引數為例進入到下一步的排程邏輯。

// 位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  private Resource assignContainer(
      FSSchedulerNode node, ResourceRequest request, NodeType type,
      boolean reserved) {

    // 當前 ResoureRequest 需要多少資源
    Resource capability = request.getCapability();

    // 當前 node 還剩多少資源可分配
    Resource available = node.getAvailableResource();

    Container reservedContainer = null;
    // 判斷是否有預留,有預留在直接從該 node 獲取對應資源。這裡不考慮預留的情況
    if (reserved) {
      reservedContainer = node.getReservedContainer().getContainer();
    }

    // 判斷該 ResourRequest 的資源需求是否能夠在該 node 上得到滿足
    if (Resources.fitsIn(capability, available)) {
      // 重點:node 資源足夠的話,這裡會分配出一個 Container
      RMContainer allocatedContainer =
          allocate(type, node, request.getPriority(), request,
              reservedContainer);
      if (allocatedContainer == null) {
        // Did the application need this resource?
        if (reserved) {
          unreserve(request.getPriority(), node);
        }
        return Resources.none();
      }

      // If we had previously made a reservation, delete it
      if (reserved) {
        unreserve(request.getPriority(), node);
      }

      // 通知 node 記錄該分配出來的 Container
      node.allocateContainer(allocatedContainer);

      // If not running unmanaged, the first container we allocate is always
      // the AM. Set the amResource for this app and update the leaf queue's AM
      // usage
      if (!isAmRunning() && !getUnmanagedAM()) {
        setAMResource(capability);
        getQueue().addAMResourceUsage(capability);
        setAmRunning(true);
      }

      return capability;
    }

    ...  // 省略
  }

重點看看分配邏輯 allocate() 方法。

// 位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
  synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node,
      Priority priority, ResourceRequest request,
      Container reservedContainer) {
    // 更新 locality 級別,忽略
    NodeType allowed = allowedLocalityLevel.get(priority);
    if (allowed != null) {
      if (allowed.equals(NodeType.OFF_SWITCH) &&
          (type.equals(NodeType.NODE_LOCAL) ||
              type.equals(NodeType.RACK_LOCAL))) {
        this.resetAllowedLocalityLevel(priority, type);
      }
      else if (allowed.equals(NodeType.RACK_LOCAL) &&
          type.equals(NodeType.NODE_LOCAL)) {
        this.resetAllowedLocalityLevel(priority, type);
      }
    }

    // Required sanity check - AM can call 'allocate' to update resource
    // request without locking the scheduler, hence we need to check
    if (getTotalRequiredResources(priority) <= 0) {
      return null;
    }

    Container container = reservedContainer;
    if (container == null) {
      // 重點:這裡會具體建立一個 Container 例項
      container =
          createContainer(node, request.getCapability(), request.getPriority());
    }

    // 用 RMContainer 記錄新創建出來的 Container 例項
    RMContainer rmContainer = new RMContainerImpl(container,
        getApplicationAttemptId(), node.getNodeID(),
        appSchedulingInfo.getUser(), rmContext);

    // 重點:記錄 rmContainer,等待下次 AM 心跳發生時,會從這裡把分配出來的 Container 帶走
    newlyAllocatedContainers.add(rmContainer);
    liveContainers.put(container.getId(), rmContainer);

    // Update consumption and track allocations
    List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
        type, node, priority, request, container);
    Resources.addTo(currentConsumption, container.getResource());
    getQueue().incUsedResource(container.getResource());
    // Update resource requests related to "request" and store in RMContainer
    ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);

    // 這裡傳送 Container 的 START 事件,更新 Container 狀態
    rmContainer.handle(
        new RMContainerEvent(container.getId(), RMContainerEventType.START));

    if (LOG.isDebugEnabled()) {
      LOG.debug("allocate: applicationAttemptId="
          + container.getId().getApplicationAttemptId()
          + " container=" + container.getId() + " host="
          + container.getNodeId().getHost() + " type=" + type);
    }
    RMAuditLogger.logSuccess(getUser(),
        AuditConstants.ALLOC_CONTAINER, "SchedulerApp",
        getApplicationId(), container.getId());

    return rmContainer;
  }

至此,一個全新的 Container 已經分配出來了,並儲存在 RM 的記憶體資料結構中,那分配出來的 Container 是如何被用到的呢?我們接著後續的邏輯。

3.3 AM 認領資源

上面知道,分配的 Container 已經儲存在 RM 的記憶體資料結構中了,接下來就是 AM 的心跳上報定時領取給自己分配的資源。

3.3.1 AM 啟動併發起資源請求

作業在啟動時,會首先啟動 ApplicationMaster 程序,啟動入口如下:

// 位置:org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
  public static void main(String[] args) {
    boolean result = false;
    try {
      ApplicationMaster appMaster = new ApplicationMaster();
      LOG.info("Initializing ApplicationMaster");
      boolean doRun = appMaster.init(args);
      if (!doRun) {
        System.exit(0);
      }
      // ApplicationMaster 啟動 run() 方法
      appMaster.run();
      result = appMaster.finish();
    } catch (Throwable t) {
      LOG.fatal("Error running ApplicationMaster", t);
      LogManager.shutdown();
      ExitUtil.terminate(1, t);
    }
    if (result) {
      LOG.info("Application Master completed successfully. exiting");
      System.exit(0);
    } else {
      LOG.info("Application Master failed. exiting");
      System.exit(2);
    }
  }

run() 方法做了什麼事呢?主要是進行 ApplicationMaster 的註冊和心跳。

// 位置:org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
 public void run() throws YarnException, IOException {
    LOG.info("Starting ApplicationMaster");

    ... // 省略

    // 初始化 AMRMClient 例項,用於向 RM 傳送 RPC 請求,這裡採用非同步方式,每個 AMRMClient 都是單獨的執行緒
    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
    amRMClient.init(conf);
    amRMClient.start();

    containerListener = createNMCallbackHandler();
    nmClientAsync = new NMClientAsyncImpl(containerListener);
    nmClientAsync.init(conf);
    nmClientAsync.start();

    appMasterHostname = NetUtils.getHostname();
    // 重要:AM 通過 RPC 請求向 RM 註冊,心跳執行緒在註冊邏輯裡啟動
    RegisterApplicationMasterResponse response = amRMClient
        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
            appMasterTrackingUrl);
    ... // 省略
  }

AM 向 RM 傳送 RPC 請求是通過ApplicationMasterService 服務實現的,這裡的 AM 註冊和心跳都需要通過該服務與 RM 進行通訊。

// 位置:org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
  public RegisterApplicationMasterResponse registerApplicationMaster(
      String appHostName, int appHostPort, String appTrackingUrl)
      throws YarnException, IOException {
    RegisterApplicationMasterResponse response = client
        .registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
    // 啟動 AM 心跳執行緒
    heartbeatThread.start();
    return response;
  }

這裡忽略registerApplicationMaster() 註冊的邏輯,主要是心跳執行緒做了些什麼,即heartbeatThread 執行緒的工作。

// 位置:org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
    public void run() {
      while (true) {
        AllocateResponse response = null;
        // synchronization ensures we don't send heartbeats after unregistering
        synchronized (unregisterHeartbeatLock) {
          if (!keepRunning) {
            return;
          }

          try {
            // 重要:AM 通過 AMRMClient 客戶端向 RM 傳送請求,進行資源的 allocate() 操作
            response = client.allocate(progress);
          } catch (ApplicationAttemptNotFoundException e) {
            handler.onShutdownRequest();
            LOG.info("Shutdown requested. Stopping callback.");
            return;
          } catch (Throwable ex) {
            LOG.error("Exception on heartbeat", ex);
            savedException = ex;
            // interrupt handler thread in case it waiting on the queue
            handlerThread.interrupt();
            return;
          }
          if (response != null) {
            while (true) {
              try {
                responseQueue.put(response);
                break;
              } catch (InterruptedException ex) {
                LOG.debug("Interrupted while waiting to put on response queue", ex);
              }
            }
          }
        }
        try {
          Thread.sleep(heartbeatIntervalMs.get());
        } catch (InterruptedException ex) {
          LOG.debug("Heartbeater interrupted", ex);
        }
      }
    }

至此,AM 已經向 RM 傳送資源請求,接下來看看 RM 是如何處理這個 RPC 請求的。

3.3.2 RM 處理 AM 請求

RM 中負責處理 AM 心跳請求是通過ApplicationMasterService 服務,其內部的 allocate() 負責處理 AM 的 RPC 資源分配請求,具體邏輯如下:

// 位置:org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  @Override
  public AllocateResponse allocate(AllocateRequest request)
      throws YarnException, IOException {

    AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();

    ApplicationAttemptId appAttemptId =
        amrmTokenIdentifier.getApplicationAttemptId();
    ApplicationId applicationId = appAttemptId.getApplicationId();

    this.amLivelinessMonitor.receivedPing(appAttemptId);

    // 針對每個 appAttempt,會有一個獨立的鎖物件
    AllocateResponseLock lock = responseMap.get(appAttemptId);
    if (lock == null) {
      String message =
          "Application attempt " + appAttemptId
              + " doesn't exist in ApplicationMasterService cache.";
      LOG.error(message);
      throw new ApplicationAttemptNotFoundException(message);
    }
    synchronized (lock) {
      AllocateResponse lastResponse = lock.getAllocateResponse();
      
      // 省略一些神聖的檢查工作

      // 重點:AM 資源請求的心跳函式,傳送新請求和接收之前的分配都需要進行
      Allocation allocation =
          this.rScheduler.allocate(appAttemptId, ask, release, 
              blacklistAdditions, blacklistRemovals);

      // 省略一些狀態更新操作
      
      lock.setAllocateResponse(allocateResponse);
      return allocateResponse;
    }    
  }

繼續跟進this.rScheduler.allocate() 方法,這裡的 scheduler 配置的是 FairScheduler,來看看它的 allocate 方法。

// 位置:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  @Override
  public Allocation allocate(ApplicationAttemptId appAttemptId,
      List<ResourceRequest> ask, List<ContainerId> release,
      List<String> blacklistAdditions, List<String> blacklistRemovals) {
    // 跳過一些檢查工作

    // 記錄 Container 分配的開始開始時間
    application.recordContainerRequestTime(getClock().getTime());

    // 釋放 AM 認為要釋放的 Container
    releaseContainers(release, application);

    synchronized (application) {
      if (!ask.isEmpty()) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("allocate: pre-update" +
              " applicationAttemptId=" + appAttemptId +
              " application=" + application.getApplicationId());
        }
        application.showRequests();

        // 更新應用的資源請求
        application.updateResourceRequests(ask);

        application.showRequests();
      }

      Set<ContainerId> preemptionContainerIds =
          application.getPreemptionContainerIds();
      if (LOG.isDebugEnabled()) {
        LOG.debug(
            "allocate: post-update" + " applicationAttemptId=" + appAttemptId
                + " #ask=" + ask.size() + " reservation= " + application
                .getCurrentReservation());

        LOG.debug("Preempting " + preemptionContainerIds.size()
            + " container(s)");
      }

      if (application.isWaitingForAMContainer(application.getApplicationId())) {
        // Allocate is for AM and update AM blacklist for this
        application.updateAMBlacklist(
            blacklistAdditions, blacklistRemovals);
      } else {
        application.updateBlacklist(blacklistAdditions, blacklistRemovals);
      }

      // 重要:這裡就是 AM 獲取最近分配的 Container。這裡獲取的其實就是前面儲存在 RM 記憶體資料結構的 Container。
      ContainersAndNMTokensAllocation allocation =
          application.pullNewlyAllocatedContainersAndNMTokens();

      // Record container allocation time
      if (!(allocation.getContainerList().isEmpty())) {
        application.recordContainerAllocationTime(getClock().getTime());
      }

      return new Allocation(allocation.getContainerList(),
        application.getHeadroom(), preemptionContainerIds, null, null,
        allocation.getNMTokenList());
    }
  }

至此,AM 已經順利拿到 RM 分配的 Container,整理 FairScheduler 資源分配流程基本就是這樣。

【參考資料】