CapacityScheduler --ApplicationMaster資源分配
CapacityScheduler --ApplicationMaster資源分配(基於hadoop 2.7.6)
資源分配是被動分配的方式,在資料節點發送心跳(NODE_UPDATE)時,根據資料節點彙報的資源情況進行排程分配.
先貼下: ApplicationMaster啟動需要的資源多少(memory和virtualcores)在客戶端提交應用程式的時候已經初始化(在YARNRunner類裡),memory預設是1536M,virtualcores預設是1.
程式碼清單:
case NODE_UPDATE :
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
RMNode node = nodeUpdatedEvent.getRMNode();
/**
* 更新節點資訊:
* 1.處理已分配的container
* 觸發RMContainerEventType.LAUNCHED事件是由LaunchedTransition轉換器處理,LaunchedTransition的主要邏輯是從containerAllocationExpirer去除對Container的監控,因為已經執行了
* 2.處理已經完成的container
* 主要是將queue,user,(FiCaSchedulerApp)application,(FiCaSchedulerNode)node中相關的資源計數更新
*/
nodeUpdate(node);
/**
* 是否非同步分配,預設值是false,預設capacity-scheduler.xml配置檔案裡是沒有配置的.
* 配置項:yarn.scheduler.capacity.scheduler-asynchronously.enable
*/
if (!scheduleAsynchronously) {
/**
* 進行資源分配
*/
allocateContainersToNode(getNode(node.getNodeID ()));
}
}
NODE_UPDATE事件處理邏輯:
1.節點更新資訊處理
2.分配資源
/**
* 1.處理已分配的container
* 觸發RMContainerEventType.LAUNCHED事件,該事件由LaunchedTransition轉換器處理,LaunchedTransition的主要邏輯是從containerAllocationExpirer去除對Container的監控,因為已經執行了(在處理APP_ATTEMPT_ADDED事件時,會將container加入到containerAllocationExpirer進行監控)
*
* 2.處理已經完成的container
* 主要是將queue,user,(FiCaSchedulerApp)application,(FiCaSchedulerNode)node中相關的資源計數更新
* @param nm
*/
private synchronized void nodeUpdate(RMNode nm) {
if (LOG.isDebugEnabled()) {
LOG.debug("nodeUpdate: " + nm + " clusterResources: " + clusterResource);
}
FiCaSchedulerNode node = getNode(nm.getNodeID());
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// Processing the newly launched containers
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
/**
* 觸發RMContainerEventType.LAUNCHED事件,該事件由LaunchedTransition轉換器處理,LaunchedTransition的主要邏輯是從containerAllocationExpirer去除對Container的監控,因為已經執行了(在處理APP_ATTEMPT_ADDED事件時,會將container加入到containerAllocationExpirer進行監控)
*/
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// Process completed containers
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
/**
* 主要是將queue,user,(FiCaSchedulerApp)application,(FiCaSchedulerNode)node中相關的資源計數更新
*/
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// Now node data structures are upto date and ready for scheduling.
if(LOG.isDebugEnabled()) {
LOG.debug("Node being looked for scheduling " + nm
+ " availableResource: " + node.getAvailableResource());
}
}
更新資料節點資訊:
1.處理已分配的container
觸發RMContainerEventType.LAUNCHED事件,該事件是由LaunchedTransition轉換器處理,LaunchedTransition的主要邏輯是從containerAllocationExpirer去除對Container的監控,因為已經執行了
2.處理已經完成的container
主要是將queue,user,(FiCaSchedulerApp)application,(FiCaSchedulerNode)node中相關的資源計數更新
在貼分配邏輯程式碼前,先YY幾個問題:
1.分配是以佇列為單位,那麼是怎麼選佇列的(按什麼順序、條件選佇列)?
2.選中佇列後,又是怎麼選應用程式進行分配(按什麼順序分配提交到佇列內的應用程式)?
/**
* 為了儘量簡單,能先看懂主體邏輯流程,先不考慮reserved情況
*/
@VisibleForTesting
public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
return;
}
/**
* 資料節點還未註冊過
*/
if (!nodes.containsKey(node.getNodeID())) {
LOG.info("Skipping scheduling as the node " + node.getNodeID() +
" has been removed");
return;
}
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
/**
* 看容器節點上有無預留資源,有預留資源則先用
*
* 為了儘量簡單,先不考慮reservedContainer情況
*/
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp reservedApplication =
getCurrentAttemptForContainer(reservedContainer.getContainerId());
// Try to fulfill the reservation
LOG.info("Trying to fulfill reservation for application " +
reservedApplication.getApplicationId() + " on node: " +
node.getNodeID());
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
CSAssignment assignment =
queue.assignContainers(
clusterResource,
node,
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)));
RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) {
Container container = excessReservation.getContainer();
queue.completedContainer(
clusterResource, assignment.getApplication(), node,
excessReservation,
SchedulerUtils.createAbnormalContainerStatus(
container.getId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED, null, true);
}
}
/**
* minimumAllocation包括最小記憶體和最小虛擬CPU數,在CapacityScheduler初始化initScheduler的時候初始化
* 最小記憶體: 配置項是yarn.scheduler.minimum-allocation-mb,預設值是1024M
* 最小虛擬CPU數: 配置項是yarn.scheduler.minimum-allocation-vcores,預設值是1
*/
// Try to schedule more if there are no reservations to fulfill
if (node.getReservedContainer() == null) {
/**
* 資料節點的可用資源是否能滿足,演算法:
* node.getAvailableResource()/minimumAllocation
*/
if (calculator.computeAvailableContainers(node.getAvailableResource(),
minimumAllocation) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
/**
* 這裡有兩個思路或問題:
* 1.從root開始匹配,那麼先匹配哪個佇列呢?
* 佇列是根據可使用容量來排序遍歷,可使用容量越多越靠前
* 2.佇列內部按什麼順序匹配需求?
* 佇列內是安排FIFO的順序匹配需求
*
* 注意:assignContainers是從根節點開始匹配,assignContainers和assignContainersToChildQueues方法是相互呼叫的遞迴方法,
* 直到葉子節點的時候才呼叫葉子節點的assignContainers進行實質上的分配
*/
root.assignContainers(
clusterResource,
node,
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)));
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
" is reserved by application " +
node.getReservedContainer().getContainerId().getApplicationAttemptId()
);
}
}
allocateContainersToNode方法的主要實現:
從根節點root開始呼叫assignContainers進行匹配,一直到葉子節點真正完成分配.這個匹配過程中與parentQueue.assignContainersToChildQueues方法兩者相互遞迴呼叫完成.
主要的是否可分配的檢查邏輯是:
1.資料節點彙報上來的可用資源是否大於等於配置的minimumAllocation.
2.檢查分配後佇列的總佔用資源是否超過佇列的資源上限.
重新回到主體邏輯程式碼:
@Override
public synchronized CSAssignment ParantQueue.assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits resourceLimits) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
Set<String> nodeLabels = node.getLabels();
/**
* 資料節點是否標籤是否正匹配:
* 1.如果佇列標籤是*,則可以訪問任何一個計算節點
* 2.如果節點沒有打標籤,則任何佇列都可以訪問
* 3.如果佇列打了固定標籤,則只能訪問對應標籤的節點
*/
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
return assignment;
}
/**
* 檢查node上的可用資源是否達到minimumAllocation要求
*
* 計算node上的資源是否可以用(是與minimumAllocation匹配),計算公式:node.getAvailableResource()-minimumAllocation>0
* 1.如果DefaultResourceCalculator是直接用上述公式計算,不需要用到clusterResource
* 2.如果DominantResourceCalculator是用資源佔用率算的,則需要用到clusterResource
*/
while (canAssign(clusterResource, node)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to assign containers to child-queue of "
+ getQueueName());
}
/**
* 檢查是否超過當前佇列資源上限,即判斷當前佇列是否可分配
*/
if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
minimumAllocation, Resources.createResource(getMetrics()
.getReservedMB(), getMetrics().getReservedVirtualCores()))) {
break;
}
/**
* 檢查通過後,分派到子佇列
*/
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node, resourceLimits);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
/**
* 有分配到資源就說明分配成功
*/
if (Resources.greaterThan(
resourceCalculator, clusterResource,
assignedToChild.getResource(), Resources.none())) {
// Track resource utilization for the parent-queue
/**
* 分配成功後,更新父佇列資源使用情況
*/
super.allocateResource(clusterResource, assignedToChild.getResource(),
nodeLabels);
/**
* 將子佇列的資源使用情況,與當前佇列分配的資源合併更新
*/
Resources.addTo(assignment.getResource(), assignedToChild.getResource());
LOG.info("assignedContainer" +
" queue=" + getQueueName() +
" usedCapacity=" + getUsedCapacity() +
" absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
" used=" + queueUsage.getUsed() +
" cluster=" + clusterResource);
} else {
break;
}
if (LOG.isDebugEnabled()) {
LOG.debug("ParentQ=" + getQueueName()
+ " assignedSoFarInThisIteration=" + assignment.getResource()
+ " usedCapacity=" + getUsedCapacity()
+ " absoluteUsedCapacity=" + getAbsoluteUsedCapacity());
}
if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
LOG.debug("Not assigning more than one off-switch container," +
" assignments so far: " + assignment);
}
}
break;
}
}
return assignment;
}
ParantQueue.assignContainers的主要邏輯:
1.檢查彙報上來的資料節點標籤是否匹配.
2.檢查彙報上來的資料節點的可用資源是否達到minimumAllocation要求.
3.檢查是否超過當前佇列的資源上限.
4.檢查通過後分派到子節點進行匹配.
/**
* 資料節點標籤是否匹配:
* 1.如果佇列標籤是星號,則可以訪問任何一個計算節點
* 2.如果節點沒有打標籤,則任何佇列都可以訪問
* 3.如果佇列打了固定標籤,則只能訪問對應標籤的節點
* @param queueLabels
* @param nodeLabels
* @return
*/
public static boolean checkQueueAccessToNode(Set<String> queueLabels,
Set<String> nodeLabels) {
if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
return true;
}
// any queue can access to a node without label
if (nodeLabels == null || nodeLabels.isEmpty()) {
return true;
}
// a queue can access to a node only if it contains any label of the node
if (queueLabels != null
&& Sets.intersection(queueLabels, nodeLabels).size() > 0) {
return true;
}
return false;
}
檢查彙報上來的資料節點標籤是否匹配:
1.如果佇列標籤是星號,則可以訪問任何一個計算節點
2.如果節點沒有打標籤,則任何佇列都可以訪問
3.如果佇列打了固定標籤,則只能訪問對應標籤的節點
/**
* 彙報上來的資料節點上的資源是否可以用,計算公式:node.getAvailableResource()-minimumAllocation>0
* 1.如果DefaultResourceCalculator是直接用上述公式計算,不需要用到clusterResource
* 2.如果DominantResourceCalculator是用資源佔用率算的,則需要用到clusterResource
*/
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
/**
* 彙報上來的資料節點的資源是否可以用,計算公式:node.getAvailableResource()-minimumAllocation>0
* 1.如果DefaultResourceCalculator是直接用上述公式計算,不需要用到clusterResource
* 2.如果DominantResourceCalculator是用資源佔用率算的,則需要用到clusterResource
*/
return (node.getReservedContainer() == null) &&
Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
node.getAvailableResource(), minimumAllocation);
}
檢查彙報上來的資料節點的可用資源是否達到minimumAllocation要求.
/**
* 檢查分配後是否會超過當前佇列的資源上限
*
* @param clusterResource
* @param nodeLabels
* @param currentResourceLimits
* @param nowRequired
* @param resourceCouldBeUnreserved
* @return
*/
synchronized boolean canAssignToThisQueue(Resource clusterResource,
Set<String> nodeLabels, ResourceLimits currentResourceLimits,
Resource nowRequired, Resource resourceCouldBeUnreserved) {
// Get label of this queue can access, it's (nodeLabel AND queueLabel)
Set<String> labelCanAccess;
if (null == nodeLabels || nodeLabels.isEmpty()) {
labelCanAccess = new HashSet<String>();
// Any queue can always access any node without label
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
} else {
labelCanAccess = new HashSet<String>(
accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
: Sets.intersection(accessibleLabels, nodeLabels));
}
for (String label : labelCanAccess) {
// New total resource = used + required
Resource newTotalResource =
Resources.add(queueUsage.getUsed(label), nowRequired);
/**
* 沒有標籤的佇列的資源上限: min(當前層級佇列的資源上限,父節點指定的上限)
* 有標籤的佇列的資源上限: 當前層級佇列的資源上限
*
* 看root傳入的是整個叢集的資源,所以一般情況下都是當前層級佇列的資源上限
*/
Resource currentLimitResource =
getCurrentLimitResource(label, clusterResource, currentResourceLimits);
/**
* 假如分配成功後,是不是超過了資源上限
*/
if (Resources.greaterThan(resourceCalculator, clusterResource,
newTotalResource, currentLimitResource)) {
if (this.reservationsContinueLooking
&& label.equals(RMNodeLabelsManager.NO_LABEL)
&& Resources.greaterThan(resourceCalculator, clusterResource,
resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource =
Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
newTotalWithoutReservedResource, currentLimitResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed()
+ ", clusterResources: " + clusterResource
+ ", reservedResources: " + resourceCouldBeUnreserved
+ ", capacity-without-reserved: "
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ currentLimitResource);
} currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource,
currentLimitResource));
return true;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
+ "Check assign to queue, label=" + label
+ " usedResources: " + queueUsage.getUsed(label)
+ " clusterResources: " + clusterResource
+ " currentUsedCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource))
+ " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity(label)
+ ")");
}
return false;
}
return true;
}
return false;
}
檢查分配後是否會超過當前佇列的資源上限.
/**
* 遍歷當前佇列的子佇列,那麼想到一個問題,遍歷順序:
* CapacityScheduler內實現了一個比較器用於給佇列排序.
* 1.首先按佇列可使用容量排序,可使用資源越多,排序越靠前
* 2.可使用資源一樣時,按佇