1. 程式人生 > >CapacityScheduler --ApplicationMaster資源分配

CapacityScheduler --ApplicationMaster資源分配

CapacityScheduler --ApplicationMaster資源分配(基於hadoop 2.7.6)

   資源分配是被動分配的方式,在資料節點發送心跳(NODE_UPDATE)時,根據資料節點彙報的資源情況進行排程分配.

   先貼下: ApplicationMaster啟動需要的資源多少(memory和virtualcores)在客戶端提交應用程式的時候已經初始化(在YARNRunner類裡),memory預設是1536M,virtualcores預設是1.

程式碼清單:

	case NODE_UPDATE
: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; RMNode node = nodeUpdatedEvent.getRMNode(); /** * 更新節點資訊: * 1.處理已分配的container * 觸發RMContainerEventType.LAUNCHED事件是由LaunchedTransition轉換器處理,LaunchedTransition的主要邏輯是從containerAllocationExpirer去除對Container的監控,因為已經執行了 * 2.處理已經完成的container * 主要是將queue,user,(FiCaSchedulerApp)application,(FiCaSchedulerNode)node中相關的資源計數更新 */
nodeUpdate(node); /** * 是否非同步分配,預設值是false,預設capacity-scheduler.xml配置檔案裡是沒有配置的. * 配置項:yarn.scheduler.capacity.scheduler-asynchronously.enable */ if (!scheduleAsynchronously) { /** * 進行資源分配 */ allocateContainersToNode(getNode(node.getNodeID
())); } }

NODE_UPDATE事件處理邏輯:
    1.節點更新資訊處理
    2.分配資源

/**
   * 	1.處理已分配的container
   * 		觸發RMContainerEventType.LAUNCHED事件,該事件由LaunchedTransition轉換器處理,LaunchedTransition的主要邏輯是從containerAllocationExpirer去除對Container的監控,因為已經執行了(在處理APP_ATTEMPT_ADDED事件時,會將container加入到containerAllocationExpirer進行監控)
   * 
   * 	2.處理已經完成的container
   * 		主要是將queue,user,(FiCaSchedulerApp)application,(FiCaSchedulerNode)node中相關的資源計數更新
   * @param nm
   */
   private synchronized void nodeUpdate(RMNode nm) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
    }
    FiCaSchedulerNode node = getNode(nm.getNodeID());
    List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
    List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
    List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
    for(UpdatedContainerInfo containerInfo : containerInfoList) {
      newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
      completedContainers.addAll(containerInfo.getCompletedContainers());
    }
    
    // Processing the newly launched containers
    for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
    	/**
         * 	觸發RMContainerEventType.LAUNCHED事件,該事件由LaunchedTransition轉換器處理,LaunchedTransition的主要邏輯是從containerAllocationExpirer去除對Container的監控,因為已經執行了(在處理APP_ATTEMPT_ADDED事件時,會將container加入到containerAllocationExpirer進行監控)
         */
      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
    }

    // Process completed containers
    for (ContainerStatus completedContainer : completedContainers) {
      ContainerId containerId = completedContainer.getContainerId();
      LOG.debug("Container FINISHED: " + containerId);
      /**
       * 	主要是將queue,user,(FiCaSchedulerApp)application,(FiCaSchedulerNode)node中相關的資源計數更新
       */
      completedContainer(getRMContainer(containerId), 
          completedContainer, RMContainerEventType.FINISHED);
    }

    // Now node data structures are upto date and ready for scheduling.
    if(LOG.isDebugEnabled()) {
      LOG.debug("Node being looked for scheduling " + nm
        + " availableResource: " + node.getAvailableResource());
    }
  }

更新資料節點資訊:
     1.處理已分配的container
           觸發RMContainerEventType.LAUNCHED事件,該事件是由LaunchedTransition轉換器處理,LaunchedTransition的主要邏輯是從containerAllocationExpirer去除對Container的監控,因為已經執行了
    2.處理已經完成的container
           主要是將queue,user,(FiCaSchedulerApp)application,(FiCaSchedulerNode)node中相關的資源計數更新


在貼分配邏輯程式碼前,先YY幾個問題:
   1.分配是以佇列為單位,那麼是怎麼選佇列的(按什麼順序、條件選佇列)?
   2.選中佇列後,又是怎麼選應用程式進行分配(按什麼順序分配提交到佇列內的應用程式)?

/**
* 為了儘量簡單,能先看懂主體邏輯流程,先不考慮reserved情況
*/
@VisibleForTesting
  public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
    if (rmContext.isWorkPreservingRecoveryEnabled()
        && !rmContext.isSchedulerReadyForAllocatingContainers()) {
      return;
    }
    /**
     * 		資料節點還未註冊過
     */
    if (!nodes.containsKey(node.getNodeID())) {
      LOG.info("Skipping scheduling as the node " + node.getNodeID() +
          " has been removed");
      return;
    }

    // Assign new containers...
    // 1. Check for reserved applications
    // 2. Schedule if there are no reservations

    /**
     * 		看容器節點上有無預留資源,有預留資源則先用
     * 		
     * 		為了儘量簡單,先不考慮reservedContainer情況
     */
    RMContainer reservedContainer = node.getReservedContainer();
    if (reservedContainer != null) {
      FiCaSchedulerApp reservedApplication =
          getCurrentAttemptForContainer(reservedContainer.getContainerId());
      
      // Try to fulfill the reservation
      LOG.info("Trying to fulfill reservation for application " + 
          reservedApplication.getApplicationId() + " on node: " + 
          node.getNodeID());
      
      LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
      CSAssignment assignment =
          queue.assignContainers(
              clusterResource,
              node,
              new ResourceLimits(labelManager.getResourceByLabel(
                  RMNodeLabelsManager.NO_LABEL, clusterResource)));
      
      RMContainer excessReservation = assignment.getExcessReservation();
      if (excessReservation != null) {
      Container container = excessReservation.getContainer();
      queue.completedContainer(
          clusterResource, assignment.getApplication(), node, 
          excessReservation, 
          SchedulerUtils.createAbnormalContainerStatus(
              container.getId(), 
              SchedulerUtils.UNRESERVED_CONTAINER), 
          RMContainerEventType.RELEASED, null, true);
      }
    }

    /**
	   * 	minimumAllocation包括最小記憶體和最小虛擬CPU數,在CapacityScheduler初始化initScheduler的時候初始化
	   * 		最小記憶體: 配置項是yarn.scheduler.minimum-allocation-mb,預設值是1024M
	   * 		最小虛擬CPU數: 配置項是yarn.scheduler.minimum-allocation-vcores,預設值是1
	   */
    // Try to schedule more if there are no reservations to fulfill
    if (node.getReservedContainer() == null) {
    	/**
    	 * 		資料節點的可用資源是否能滿足,演算法:
    	 * 		node.getAvailableResource()/minimumAllocation
    	 */
      if (calculator.computeAvailableContainers(node.getAvailableResource(),
        minimumAllocation) > 0) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Trying to schedule on node: " + node.getNodeName() +
              ", available: " + node.getAvailableResource());
        }
        /**
         * 這裡有兩個思路或問題:
         * 	1.從root開始匹配,那麼先匹配哪個佇列呢?
         * 		佇列是根據可使用容量來排序遍歷,可使用容量越多越靠前
         * 	2.佇列內部按什麼順序匹配需求?
         * 		佇列內是安排FIFO的順序匹配需求
         * 
         * 	注意:assignContainers是從根節點開始匹配,assignContainers和assignContainersToChildQueues方法是相互呼叫的遞迴方法,
         * 	直到葉子節點的時候才呼叫葉子節點的assignContainers進行實質上的分配
         */
        root.assignContainers(
            clusterResource,
            node,
            new ResourceLimits(labelManager.getResourceByLabel(
                RMNodeLabelsManager.NO_LABEL, clusterResource)));
      }
    } else {
      LOG.info("Skipping scheduling since node " + node.getNodeID() + 
          " is reserved by application " + 
          node.getReservedContainer().getContainerId().getApplicationAttemptId()
          );
    }
  }

allocateContainersToNode方法的主要實現:
  從根節點root開始呼叫assignContainers進行匹配,一直到葉子節點真正完成分配.這個匹配過程中與parentQueue.assignContainersToChildQueues方法兩者相互遞迴呼叫完成.
主要的是否可分配的檢查邏輯是:
      1.資料節點彙報上來的可用資源是否大於等於配置的minimumAllocation.
      2.檢查分配後佇列的總佔用資源是否超過佇列的資源上限.
重新回到主體邏輯程式碼:

@Override
  public synchronized CSAssignment ParantQueue.assignContainers(Resource clusterResource,
      FiCaSchedulerNode node, ResourceLimits resourceLimits) {
    CSAssignment assignment = 
        new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
    Set<String> nodeLabels = node.getLabels();
    
    /**
     * 	資料節點是否標籤是否正匹配:
     * 		1.如果佇列標籤是*,則可以訪問任何一個計算節點
     * 		2.如果節點沒有打標籤,則任何佇列都可以訪問
     * 		3.如果佇列打了固定標籤,則只能訪問對應標籤的節點
     */
    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
      return assignment;
    }
    /**
	   * 	檢查node上的可用資源是否達到minimumAllocation要求
	   * 
	   * 	計算node上的資源是否可以用(是與minimumAllocation匹配),計算公式:node.getAvailableResource()-minimumAllocation>0
	   * 		1.如果DefaultResourceCalculator是直接用上述公式計算,不需要用到clusterResource
	   * 		2.如果DominantResourceCalculator是用資源佔用率算的,則需要用到clusterResource
	  */
    while (canAssign(clusterResource, node)) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Trying to assign containers to child-queue of "
          + getQueueName());
      }
      /**
       * 	檢查是否超過當前佇列資源上限,即判斷當前佇列是否可分配
       */
      if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
          minimumAllocation, Resources.createResource(getMetrics()
              .getReservedMB(), getMetrics().getReservedVirtualCores()))) {
        break;
      }
      
      /**
      * 檢查通過後,分派到子佇列
      */
      CSAssignment assignedToChild = 
          assignContainersToChildQueues(clusterResource, node, resourceLimits);
      assignment.setType(assignedToChild.getType());
      
      // Done if no child-queue assigned anything
      /**
	   *   有分配到資源就說明分配成功
      */
      if (Resources.greaterThan(
              resourceCalculator, clusterResource, 
              assignedToChild.getResource(), Resources.none())) {
        // Track resource utilization for the parent-queue
    	  /**
    	   * 	分配成功後,更新父佇列資源使用情況
    	   */
        super.allocateResource(clusterResource, assignedToChild.getResource(),
            nodeLabels);
        
        /**
         * 	將子佇列的資源使用情況,與當前佇列分配的資源合併更新
         */
        Resources.addTo(assignment.getResource(), assignedToChild.getResource());
        
        LOG.info("assignedContainer" +
            " queue=" + getQueueName() + 
            " usedCapacity=" + getUsedCapacity() +
            " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
            " used=" + queueUsage.getUsed() + 
            " cluster=" + clusterResource);

      } else {
        break;
      }

      if (LOG.isDebugEnabled()) {
        LOG.debug("ParentQ=" + getQueueName()
          + " assignedSoFarInThisIteration=" + assignment.getResource()
          + " usedCapacity=" + getUsedCapacity()
          + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity());
      }

      if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
        if (LOG.isDebugEnabled()) {
          if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
            LOG.debug("Not assigning more than one off-switch container," +
                " assignments so far: " + assignment);
          }
        }
        break;
      }
    } 
    
    return assignment;
  }

ParantQueue.assignContainers的主要邏輯:
   1.檢查彙報上來的資料節點標籤是否匹配.
   2.檢查彙報上來的資料節點的可用資源是否達到minimumAllocation要求.
   3.檢查是否超過當前佇列的資源上限.
   4.檢查通過後分派到子節點進行匹配.

/**
   * 	資料節點標籤是否匹配:
   * 		1.如果佇列標籤是星號,則可以訪問任何一個計算節點
   * 		2.如果節點沒有打標籤,則任何佇列都可以訪問
   * 		3.如果佇列打了固定標籤,則只能訪問對應標籤的節點
   * @param queueLabels
   * @param nodeLabels
   * @return
   */
  public static boolean checkQueueAccessToNode(Set<String> queueLabels,
      Set<String> nodeLabels) {
    if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
      return true;
    }
    // any queue can access to a node without label
    if (nodeLabels == null || nodeLabels.isEmpty()) {
      return true;
    }
    // a queue can access to a node only if it contains any label of the node
    if (queueLabels != null
        && Sets.intersection(queueLabels, nodeLabels).size() > 0) {
      return true;
    }
    return false;
  }

檢查彙報上來的資料節點標籤是否匹配:
   1.如果佇列標籤是星號,則可以訪問任何一個計算節點
   2.如果節點沒有打標籤,則任何佇列都可以訪問
   3.如果佇列打了固定標籤,則只能訪問對應標籤的節點

/**
   * 彙報上來的資料節點上的資源是否可以用,計算公式:node.getAvailableResource()-minimumAllocation>0
   * 		1.如果DefaultResourceCalculator是直接用上述公式計算,不需要用到clusterResource
   * 		2.如果DominantResourceCalculator是用資源佔用率算的,則需要用到clusterResource
   */
  private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
	  /**
	   * 	彙報上來的資料節點的資源是否可以用,計算公式:node.getAvailableResource()-minimumAllocation>0
	   * 		1.如果DefaultResourceCalculator是直接用上述公式計算,不需要用到clusterResource
	   * 		2.如果DominantResourceCalculator是用資源佔用率算的,則需要用到clusterResource
	   */
    return (node.getReservedContainer() == null) && 
        Resources.greaterThanOrEqual(resourceCalculator, clusterResource, 
            node.getAvailableResource(), minimumAllocation);
  }

檢查彙報上來的資料節點的可用資源是否達到minimumAllocation要求.

/**
   * 	檢查分配後是否會超過當前佇列的資源上限
   * 		
   * @param clusterResource
   * @param nodeLabels
   * @param currentResourceLimits
   * @param nowRequired
   * @param resourceCouldBeUnreserved
   * @return
   */
  synchronized boolean canAssignToThisQueue(Resource clusterResource,
      Set<String> nodeLabels, ResourceLimits currentResourceLimits,
      Resource nowRequired, Resource resourceCouldBeUnreserved) {
    // Get label of this queue can access, it's (nodeLabel AND queueLabel)
    Set<String> labelCanAccess;
    if (null == nodeLabels || nodeLabels.isEmpty()) {
      labelCanAccess = new HashSet<String>();
      // Any queue can always access any node without label
      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
    } else {
      labelCanAccess = new HashSet<String>(
          accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
              : Sets.intersection(accessibleLabels, nodeLabels));
    }
    
    for (String label : labelCanAccess) {
      // New total resource = used + required
      Resource newTotalResource =
          Resources.add(queueUsage.getUsed(label), nowRequired);

      /**
       * 	沒有標籤的佇列的資源上限: min(當前層級佇列的資源上限,父節點指定的上限)
       * 	有標籤的佇列的資源上限: 當前層級佇列的資源上限
       * 		
       * 	看root傳入的是整個叢集的資源,所以一般情況下都是當前層級佇列的資源上限
       */
      Resource currentLimitResource =
          getCurrentLimitResource(label, clusterResource, currentResourceLimits);

      /**
       * 	假如分配成功後,是不是超過了資源上限
       */
      if (Resources.greaterThan(resourceCalculator, clusterResource,
          newTotalResource, currentLimitResource)) {

        if (this.reservationsContinueLooking
            && label.equals(RMNodeLabelsManager.NO_LABEL)
            && Resources.greaterThan(resourceCalculator, clusterResource,
            resourceCouldBeUnreserved, Resources.none())) {
          // resource-without-reserved = used - reserved
          Resource newTotalWithoutReservedResource =
              Resources.subtract(newTotalResource, resourceCouldBeUnreserved);

          if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
              newTotalWithoutReservedResource, currentLimitResource)) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("try to use reserved: " + getQueueName()
                  + " usedResources: " + queueUsage.getUsed()
                  + ", clusterResources: " + clusterResource
                  + ", reservedResources: " + resourceCouldBeUnreserved
                  + ", capacity-without-reserved: "
                  + newTotalWithoutReservedResource + ", maxLimitCapacity: "
                  + currentLimitResource);
            }  currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource,
                currentLimitResource));
            return true;
          }
        }
        if (LOG.isDebugEnabled()) {
          LOG.debug(getQueueName()
              + "Check assign to queue, label=" + label
              + " usedResources: " + queueUsage.getUsed(label)
              + " clusterResources: " + clusterResource
              + " currentUsedCapacity "
              + Resources.divide(resourceCalculator, clusterResource,
              queueUsage.getUsed(label),
              labelManager.getResourceByLabel(label, clusterResource))
              + " max-capacity: "
              + queueCapacities.getAbsoluteMaximumCapacity(label)
              + ")");
        }
        return false;
      }
      return true;
    }
    return false;
  }

檢查分配後是否會超過當前佇列的資源上限.

/**
   * 	遍歷當前佇列的子佇列,那麼想到一個問題,遍歷順序:
   * 		CapacityScheduler內實現了一個比較器用於給佇列排序.
   * 			1.首先按佇列可使用容量排序,可使用資源越多,排序越靠前
   * 			2.可使用資源一樣時,按佇