1. 程式人生 > 其它 >YARN : FairScheduler深入解析(NodeUpdate、assignContainer)

YARN : FairScheduler深入解析(NodeUpdate、assignContainer)

技術標籤:hadoopYARNHadoopYARNFairScheduler

一、概要

首先,YARN FairScheduler主要做的事情:
① 處理NM心跳NodeUpdate,分配container。
② 樹狀維護佇列和任務,定時計算fair share等資訊,並進行排序。

本文重點分析①

二、程式碼

1、流程框架

① FairScheduler接收心跳

  public void handle(SchedulerEvent event) {
    switch (event.getType()) {
    ....
    case NODE_UPDATE:
      if
(!(event instanceof NodeUpdateSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } // 入口:接收並開始處理Node Update事件 NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; nodeUpdate(nodeUpdatedEvent.getRMNode());
break; .... default: LOG.error("Unknown event arrived at FairScheduler: " + event.toString()); } }

② FairScheduler.nodeUpdate

  /**
   * 處理一個NM發來的心跳
   */
  private synchronized void nodeUpdate(RMNode nm) {
    long start = getClock().getTime();
    if (LOG.isDebugEnabled
()) { LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource); } eventLog.log("HEARTBEAT", nm.getHostName()); // 獲取發來請求的NM資訊 FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID()); // 獲取該NM上container執行情況 List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates(); List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>(); List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>(); // 篩出在上次心跳到本次心跳的間隔內,新啟動的container和完成關閉的container for(UpdatedContainerInfo containerInfo : containerInfoList) { newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers()); completedContainers.addAll(containerInfo.getCompletedContainers()); } // 更新新啟動的container資訊,將該Container從超時監控佇列中刪除 // 每當RM分配一個container後,為了防止AM長時間不用這個container造成資源浪費, // 會將該container加入到超時佇列中,一段時間不用就會被回收。 for (ContainerStatus launchedContainer : newlyLaunchedContainers) { containerLaunchedOnNode(launchedContainer.getContainerId(), node); } // 更新完成關閉的container資訊,更新叢集資源資訊 for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); completedContainer(getRMContainer(containerId), completedContainer, RMContainerEventType.FINISHED); } // 獲取並更新container資源使用情況 List<ContainerResourceToReport> containerResourceToReports = nm.pullContainerResourceUpdates(); for (ContainerResourceToReport containerResource : containerResourceToReports) { updateContainerResource(containerResource); } if (continuousSchedulingEnabled) { // 持續排程,暫不介紹 if (!balanceSchedulingEnabled && !completedContainers.isEmpty()) { attemptScheduling(node); } } else { // 核心分配container方法 attemptScheduling(node); } // metrics方法執行耗時統計 long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); }

細節展開:
處理上個心跳週期內新啟動的container:
containerLaunchedOnNode

  protected synchronized void containerLaunchedOnNode(
      ContainerId containerId, SchedulerNode node) {
    // 獲取container對應的application資訊
    SchedulerApplicationAttempt application = getCurrentAttemptForContainer
        (containerId);
    if (application == null) {
      LOG.info("Unknown application "
          + containerId.getApplicationAttemptId().getApplicationId()
          + " launched container " + containerId + " on node: " + node);
      this.rmContext.getDispatcher().getEventHandler()
        .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
      return;
    }
    // application依然存活
    application.containerLaunchedOnNode(containerId, node.getNodeID());
  }

--containerLaunchedOnNode-->>>
  public synchronized void containerLaunchedOnNode(ContainerId containerId,
      NodeId nodeId) {
    // 獲取RM上該container資訊
    RMContainer rmContainer = getRMContainer(containerId);
    if (rmContainer == null) {
      // 校驗該container是否在RM中有記錄,沒有則kill
      rmContext.getDispatcher().getEventHandler()
        .handle(new RMNodeCleanContainerEvent(nodeId, containerId));
      return;
    }

    // 傳送事件launched
    rmContainer.handle(new RMContainerEvent(containerId,
        RMContainerEventType.LAUNCHED));
  }

③ FairScheduler.attemptScheduling

  synchronized boolean attemptScheduling(FSSchedulerNode node) {
    boolean hasAssigned = false;
    if (rmContext.isWorkPreservingRecoveryEnabled()
        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
      // 沒準備好分配container,直接false
      return hasAssigned;
    }

    final NodeId nodeID = node.getNodeID();
    // 節點丟了,直接false
    if (!nodes.containsKey(nodeID)) {
      // The node might have just been removed while this thread was waiting
      // on the synchronized lock before it entered this synchronized method
      LOG.info("Skipping scheduling as the node " + nodeID +
          " has been removed");
      return hasAssigned;
    }

    // Assign new containers...
    // 1. Check for reserved applications
    // 2. Schedule if there are no reservations

    FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
    // 有預留,先嚐試給reserved分配,分配不了就等下次心跳,
    // 只有reserved分配完了才能繼續給其他application分配container
    if (reservedAppSchedulable != null) {
      SchedulerRequestKey schedulerKey = node.getReservedContainer().getReservedSchedulerKey();
      FSQueue queue = reservedAppSchedulable.getQueue();

      // 同時滿足locality和剩餘資源,當前NM滿足分配container條件
      // 以及當前佇列已分配+準備分配的資源不超過該佇列最近一次計算出來的最大fair share
      if (!reservedAppSchedulable.hasContainerForNode(schedulerKey, node)
          || !fitsInMaxShare(queue,
          node.getReservedContainer().getReservedResource())) {
        // Don't hold the reservation if app can no longer use it
        LOG.info("Releasing reservation that cannot be satisfied for application "
            + reservedAppSchedulable.getApplicationAttemptId()
            + " on node " + node);
        // 不滿足上述倆條件之一,說明該節點滿了或者佇列滿了,釋放reserved
        reservedAppSchedulable.unreserve(schedulerKey, node);
        reservedAppSchedulable = null;
      } else {
        // Reservation exists; try to fulfill the reservation
        if (LOG.isDebugEnabled()) {
          LOG.debug("Trying to fulfill reservation for application "
              + reservedAppSchedulable.getApplicationAttemptId()
              + " on node: " + node);
        }
        // 嘗試分配container或繼續reserve資源
        Resource assignedReservedContainer = node.getReservedAppSchedulable().assignReservedContainer(node);
        hasAssigned = (assignedReservedContainer != Resources.none());
      }
    }
    // 沒有reserve的情況,正常分配container
    if (reservedAppSchedulable == null) {
      // No reservation, schedule at queue which is farthest below fair share
      int assignedContainers = 0;
      Resource assignedResource = Resources.clone(Resources.none());
      // 最多一次只能分配一個NM剩餘可用資源50%
      Resource maxResourcesToAssign =
          Resources.multiply(node.getAvailableResource(), 0.5f);
      // 沒有預留且該NM剩餘資源>=單次分配最小資源,即儘量拿完該NM的資源
      while (node.getReservedContainer() == null &&
          Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
        boolean assignedContainer = false;
        long start = System.nanoTime();
        // 在該NM上向childQueues中的佇列,按照順序依次分配container,直到不夠用reserve
        // 重點方法assignContainer會在下文中著重分析
        Resource assignment = queueMgr.getRootQueue().assignContainer(node);
        // 成功分配了資源,計算增減,統計耗時
        if (!assignment.equals(Resources.none())) {
          assignedContainers++;
          assignedContainer = true;
          hasAssigned = true;
          Resources.addTo(assignedResource, assignment);
          long end = System.nanoTime();
          fsOpDurations.addAssignContainerCallDuration((end - start)/1000);
        }
        // 沒分配container,說明該NM沒資源或者沒有pending container,沒必要繼續迴圈了
        if (!assignedContainer) { break; }
        // 沒開啟assignMultiple,該NM分配一次就溜了
        // 該NM分配的container數量達到設定的單次心跳分配個數上限maxAssign,也溜了
        // 否則就繼續迴圈分配
        if (!shouldContinueAssigning(assignedContainers,
            maxResourcesToAssign, assignedResource)) {
          break;
        }
      }
      if (LOG.isDebugEnabled()) {
        LOG.debug("Schedule node:" + node.getNodeName() + "; " +
            "Max resources to assign:" + maxResourcesToAssign + "; " + 
            "Assign container numbers:" + assignedContainers);
      }
    }
    updateRootQueueMetrics();
    return hasAssigned;
  }

④ FSParentQueue.assignContainer

public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = Resources.none();

    // 已申請+即將申請的資源總和超過佇列上限或者有reserved container
    if (!assignContainerPreCheck(node)) {
      LOG.info("Cannot assign container to queue: " + getQueueName() +
          ", for it usage resource over its limit.");
      return assigned;
    }
  
    boolean isRoundEnd = false;
    readLock.lock();
    // pendingChildQueues是由childQueues filter出來有pending資源的集合
    // 該優化還未提交到社群,可以將其視為childQueues
    int childQueuesSize = pendingChildQueues.size();
    int preScheduleIndex = createScheduleIndex(childQueuesSize);
    try {
      // 從排序好的佇列頭開始進行分配container,逐個滿足資源申請需求
      while (getCurrentScheduleIndex() < childQueuesSize) {
        FSQueue child = pendingChildQueues.get(getCurrentScheduleIndex());
        // 掃到一個demand為none的,則後面均為none(由佇列排序決定,demand少則排在後面)
        if (child.getDemand().equals(Resources.none())) {
          incCurrentScheduleIndex();
          isRoundEnd = true;
          break;
        }
        // 給子佇列/application分配container
        assigned = child.assignContainer(node);
        // 分配了個空氣,同理停下
        if (!Resources.equals(assigned, Resources.none())) {
          break;
        }
        // 佇列指標後移
        incCurrentScheduleIndex();
      }
      // 滿足了pendingChildQueues所有需求
      if (!isRoundEnd && getCurrentScheduleIndex() >= childQueuesSize) {
        isRoundEnd = true;
      }
    } finally {
      readLock.unlock();
    }
  
    // 一輪分配所有pendingChildQueues的demand都被滿足,重置指標至隊首
    if (parent == null && isRoundEnd) {
      resetCurrentScheduleIndex();
    }
  
    if (LOG.isDebugEnabled()) {
      LOG.debug("Node " + node.getNodeName() +
          ", offered to queue: " + getName() +
          ", prev schedule index:" + preScheduleIndex +
          ", current schedule index:" + getCurrentScheduleIndex() +
          ", child queue count:" + childQueuesSize +
          ", assigned:" + assigned);
    }

    return assigned;
  }

⑤ FSLeafQueue.assignContainer

public Resource assignContainer(FSSchedulerNode node) {
    Resource assigned = Resources.none();
    // 同FSParentQueue.assignContainer,增加了NM的label校驗,不同label不分配直接返回
    if (!nodeLabelCheck(node.getNodeID()) || !assignContainerPreCheck(node)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Node: " + node.getNodeID() +
            " cannot assigned to queue: " + getQueueName() +
            " for resource usage may over its limits or node label does not match.");
      }
      return assigned;
    }
  
    readLock.lock();
    int runnableAppsSize = pendingRunnableApps.size();
    int preScheduleIndex = createScheduleIndex(runnableAppsSize);
    try {
      for ( ; getCurrentScheduleIndex() < runnableAppsSize; incCurrentScheduleIndex()) {
        FSAppAttempt appAttempt = pendingRunnableApps.get(getCurrentScheduleIndex());
        // 過濾RM中沒有記錄的請求
        if (!appAttempt.isInRMContext()) {
          LOG.error(appAttempt.getApplicationId()
              + " is not in rmContext, this is not expected");
          continue;
        }
        // NM在該application的黑名單中,不分配
        if (SchedulerAppUtils.isBlacklisted(appAttempt, node, LOG)) {
          continue;
        }
        Resource pending = appAttempt.getAppAttemptResourceUsage().getPending();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Assign container on: " + node.getNodeName()
              + ", app attempt id:" + appAttempt.getName()
              + ", app demand:" + appAttempt.getDemand()
              + ", app resource usage:" + appAttempt.getResourceUsage()
              + ", pending:" + pending);
        }
        // pending不為none,開始分配
        if (!pending.equals(Resources.none())) {
          assigned = appAttempt.assignContainer(node);
          if (!assigned.equals(Resources.none())) {
            // 分配結果為none,同理無需進行後續分配
            break;
          }
        }
      }
    } finally {
      readLock.unlock();
    }
  
    if (LOG.isDebugEnabled()) {
      LOG.debug("Node " + node.getNodeName() +
          ", offered to queue: " + getName() +
          ", prev schedule index:" + preScheduleIndex +
          ", current schedule index:" + getCurrentScheduleIndex() +
          ", runnable apps count:" + runnableAppsSize +
          ", assigned:" + assigned);
    }
    
    return assigned;
  }

⑥ FSAppAttempt.assignContainer

  public Resource assignContainer(FSSchedulerNode node) {
    // AM佔用總資源超過佇列限制,預設AM總資源不超過佇列maxShare * 0.5f
    if (isOverAMShareLimit()) {
      return Resources.none();
    }
    return assignContainer(node, false);
  }

內層assignContainer先獲取locality,分為三種:
在這裡插入圖片描述
字面意思,很好理解。

再內層assignContainer:

private Resource assignContainer(
      FSSchedulerNode node, ResourceRequest request, NodeType type,
      boolean reserved, SchedulerRequestKey schedulerKey) {

    // 校驗label
    if (!SchedulerUtils.checkNodeLabelExpression(node.getLabels(),
        request.getNodeLabelExpression())) {
      return Resources.none();
    }

    // 本次請求需要資源
    Resource capability = request.getCapability();

    // 當前NM空閒資源
    Resource available = node.getAvailableResource();

    Container container = null;
    // 有reserved則分配reserved container,
    // 沒有reserved則根據capability和schedulerKey new一個
    if (reserved) {
      container = node.getReservedContainer().getContainer();
    } else {
      container = createContainer(node, capability, schedulerKey);
    }

    // 請求資源 <= NM空閒資源
    if (Resources.fitsIn(capability, available)) {
      // 分配,傳送start事件
      RMContainer allocatedContainer =
          allocate(type, node, schedulerKey, request, container);
      // 其他執行緒已經分配足夠的container了,此次不分配
      if (allocatedContainer == null) {
        // 清空reserved
        if (reserved) {
          unreserve(schedulerKey, node);
        }
        return Resources.none();
      }

      // 清空reserved
      if (reserved) {
        unreserve(schedulerKey, node);
      }

      // 更新該節點的資訊(container數,可用資源等)
      node.allocateContainer(allocatedContainer);
  
      // AM還沒起,第一個container就是AM,配置AM資訊
      if (!isAmRunning() && !getUnmanagedAM()) {
        setAMResource(container.getResource());
        getQueue().addAMResourceUsage(container.getResource());
        setAmRunning(true);
      }

      return container.getResource();
    } else {
      // 資源不夠分配

      // 佇列不滿足資源max share限制,則不分配
      if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) {
        return Resources.none();
      }

      // 資源不夠分配,則進行reserve
      // reserve具體大致是先判斷是否已經reserve,沒有則new一個新的RMContainer
      // 已經reserve則把此次資源add到currentReservation中,併發送reserve事件
      if (reserve(schedulerKey, node, container, type, reserved)) {
        return FairScheduler.CONTAINER_RESERVED;
      } else {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Couldn't create reservation for app:  " + getName()
              + ", at priority " +  request.getPriority());
        }
        return Resources.none();
      }
    }
  }

三、總結

本文介紹了FairScheduler處理NM心跳NodeUpdate,分配container的程式碼,另一部分在之前的文章 樹狀維護佇列和任務,定時計算fair share等資訊,並進行排序。

總的來說YARN這塊程式碼看起來還是比較簡單清晰的,之後會出CapacityScheduler相關的文章。