YARN : FairScheduler深入解析(NodeUpdate、assignContainer)
阿新 • • 發佈:2020-12-11
技術標籤:hadoopYARNHadoopYARNFairScheduler
一、概要
首先,YARN FairScheduler主要做的事情:
① 處理NM心跳NodeUpdate,分配container。
② 樹狀維護佇列和任務,定時計算fair share等資訊,並進行排序。
本文重點分析①
二、程式碼
1、流程框架
① FairScheduler接收心跳
public void handle(SchedulerEvent event) {
switch (event.getType()) {
....
case NODE_UPDATE:
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
// 入口:接收並開始處理Node Update事件
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode());
break;
....
default:
LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
}
}
② FairScheduler.nodeUpdate
/**
* 處理一個NM發來的心跳
*/
private synchronized void nodeUpdate(RMNode nm) {
long start = getClock().getTime();
if (LOG.isDebugEnabled ()) {
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
}
eventLog.log("HEARTBEAT", nm.getHostName());
// 獲取發來請求的NM資訊
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
// 獲取該NM上container執行情況
List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
List<ContainerStatus> newlyLaunchedContainers = new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = new ArrayList<ContainerStatus>();
// 篩出在上次心跳到本次心跳的間隔內,新啟動的container和完成關閉的container
for(UpdatedContainerInfo containerInfo : containerInfoList) {
newlyLaunchedContainers.addAll(containerInfo.getNewlyLaunchedContainers());
completedContainers.addAll(containerInfo.getCompletedContainers());
}
// 更新新啟動的container資訊,將該Container從超時監控佇列中刪除
// 每當RM分配一個container後,為了防止AM長時間不用這個container造成資源浪費,
// 會將該container加入到超時佇列中,一段時間不用就會被回收。
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
// 更新完成關閉的container資訊,更新叢集資源資訊
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
// 獲取並更新container資源使用情況
List<ContainerResourceToReport> containerResourceToReports = nm.pullContainerResourceUpdates();
for (ContainerResourceToReport containerResource : containerResourceToReports) {
updateContainerResource(containerResource);
}
if (continuousSchedulingEnabled) {
// 持續排程,暫不介紹
if (!balanceSchedulingEnabled && !completedContainers.isEmpty()) {
attemptScheduling(node);
}
} else {
// 核心分配container方法
attemptScheduling(node);
}
// metrics方法執行耗時統計
long duration = getClock().getTime() - start;
fsOpDurations.addNodeUpdateDuration(duration);
}
細節展開:
處理上個心跳週期內新啟動的container:
containerLaunchedOnNode
protected synchronized void containerLaunchedOnNode(
ContainerId containerId, SchedulerNode node) {
// 獲取container對應的application資訊
SchedulerApplicationAttempt application = getCurrentAttemptForContainer
(containerId);
if (application == null) {
LOG.info("Unknown application "
+ containerId.getApplicationAttemptId().getApplicationId()
+ " launched container " + containerId + " on node: " + node);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
return;
}
// application依然存活
application.containerLaunchedOnNode(containerId, node.getNodeID());
}
--containerLaunchedOnNode-->>>
public synchronized void containerLaunchedOnNode(ContainerId containerId,
NodeId nodeId) {
// 獲取RM上該container資訊
RMContainer rmContainer = getRMContainer(containerId);
if (rmContainer == null) {
// 校驗該container是否在RM中有記錄,沒有則kill
rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeCleanContainerEvent(nodeId, containerId));
return;
}
// 傳送事件launched
rmContainer.handle(new RMContainerEvent(containerId,
RMContainerEventType.LAUNCHED));
}
③ FairScheduler.attemptScheduling
synchronized boolean attemptScheduling(FSSchedulerNode node) {
boolean hasAssigned = false;
if (rmContext.isWorkPreservingRecoveryEnabled()
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
// 沒準備好分配container,直接false
return hasAssigned;
}
final NodeId nodeID = node.getNodeID();
// 節點丟了,直接false
if (!nodes.containsKey(nodeID)) {
// The node might have just been removed while this thread was waiting
// on the synchronized lock before it entered this synchronized method
LOG.info("Skipping scheduling as the node " + nodeID +
" has been removed");
return hasAssigned;
}
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
// 有預留,先嚐試給reserved分配,分配不了就等下次心跳,
// 只有reserved分配完了才能繼續給其他application分配container
if (reservedAppSchedulable != null) {
SchedulerRequestKey schedulerKey = node.getReservedContainer().getReservedSchedulerKey();
FSQueue queue = reservedAppSchedulable.getQueue();
// 同時滿足locality和剩餘資源,當前NM滿足分配container條件
// 以及當前佇列已分配+準備分配的資源不超過該佇列最近一次計算出來的最大fair share
if (!reservedAppSchedulable.hasContainerForNode(schedulerKey, node)
|| !fitsInMaxShare(queue,
node.getReservedContainer().getReservedResource())) {
// Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application "
+ reservedAppSchedulable.getApplicationAttemptId()
+ " on node " + node);
// 不滿足上述倆條件之一,說明該節點滿了或者佇列滿了,釋放reserved
reservedAppSchedulable.unreserve(schedulerKey, node);
reservedAppSchedulable = null;
} else {
// Reservation exists; try to fulfill the reservation
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to fulfill reservation for application "
+ reservedAppSchedulable.getApplicationAttemptId()
+ " on node: " + node);
}
// 嘗試分配container或繼續reserve資源
Resource assignedReservedContainer = node.getReservedAppSchedulable().assignReservedContainer(node);
hasAssigned = (assignedReservedContainer != Resources.none());
}
}
// 沒有reserve的情況,正常分配container
if (reservedAppSchedulable == null) {
// No reservation, schedule at queue which is farthest below fair share
int assignedContainers = 0;
Resource assignedResource = Resources.clone(Resources.none());
// 最多一次只能分配一個NM剩餘可用資源50%
Resource maxResourcesToAssign =
Resources.multiply(node.getAvailableResource(), 0.5f);
// 沒有預留且該NM剩餘資源>=單次分配最小資源,即儘量拿完該NM的資源
while (node.getReservedContainer() == null &&
Resources.fitsIn(minimumAllocation, node.getAvailableResource())) {
boolean assignedContainer = false;
long start = System.nanoTime();
// 在該NM上向childQueues中的佇列,按照順序依次分配container,直到不夠用reserve
// 重點方法assignContainer會在下文中著重分析
Resource assignment = queueMgr.getRootQueue().assignContainer(node);
// 成功分配了資源,計算增減,統計耗時
if (!assignment.equals(Resources.none())) {
assignedContainers++;
assignedContainer = true;
hasAssigned = true;
Resources.addTo(assignedResource, assignment);
long end = System.nanoTime();
fsOpDurations.addAssignContainerCallDuration((end - start)/1000);
}
// 沒分配container,說明該NM沒資源或者沒有pending container,沒必要繼續迴圈了
if (!assignedContainer) { break; }
// 沒開啟assignMultiple,該NM分配一次就溜了
// 該NM分配的container數量達到設定的單次心跳分配個數上限maxAssign,也溜了
// 否則就繼續迴圈分配
if (!shouldContinueAssigning(assignedContainers,
maxResourcesToAssign, assignedResource)) {
break;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Schedule node:" + node.getNodeName() + "; " +
"Max resources to assign:" + maxResourcesToAssign + "; " +
"Assign container numbers:" + assignedContainers);
}
}
updateRootQueueMetrics();
return hasAssigned;
}
④ FSParentQueue.assignContainer
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none();
// 已申請+即將申請的資源總和超過佇列上限或者有reserved container
if (!assignContainerPreCheck(node)) {
LOG.info("Cannot assign container to queue: " + getQueueName() +
", for it usage resource over its limit.");
return assigned;
}
boolean isRoundEnd = false;
readLock.lock();
// pendingChildQueues是由childQueues filter出來有pending資源的集合
// 該優化還未提交到社群,可以將其視為childQueues
int childQueuesSize = pendingChildQueues.size();
int preScheduleIndex = createScheduleIndex(childQueuesSize);
try {
// 從排序好的佇列頭開始進行分配container,逐個滿足資源申請需求
while (getCurrentScheduleIndex() < childQueuesSize) {
FSQueue child = pendingChildQueues.get(getCurrentScheduleIndex());
// 掃到一個demand為none的,則後面均為none(由佇列排序決定,demand少則排在後面)
if (child.getDemand().equals(Resources.none())) {
incCurrentScheduleIndex();
isRoundEnd = true;
break;
}
// 給子佇列/application分配container
assigned = child.assignContainer(node);
// 分配了個空氣,同理停下
if (!Resources.equals(assigned, Resources.none())) {
break;
}
// 佇列指標後移
incCurrentScheduleIndex();
}
// 滿足了pendingChildQueues所有需求
if (!isRoundEnd && getCurrentScheduleIndex() >= childQueuesSize) {
isRoundEnd = true;
}
} finally {
readLock.unlock();
}
// 一輪分配所有pendingChildQueues的demand都被滿足,重置指標至隊首
if (parent == null && isRoundEnd) {
resetCurrentScheduleIndex();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Node " + node.getNodeName() +
", offered to queue: " + getName() +
", prev schedule index:" + preScheduleIndex +
", current schedule index:" + getCurrentScheduleIndex() +
", child queue count:" + childQueuesSize +
", assigned:" + assigned);
}
return assigned;
}
⑤ FSLeafQueue.assignContainer
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none();
// 同FSParentQueue.assignContainer,增加了NM的label校驗,不同label不分配直接返回
if (!nodeLabelCheck(node.getNodeID()) || !assignContainerPreCheck(node)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Node: " + node.getNodeID() +
" cannot assigned to queue: " + getQueueName() +
" for resource usage may over its limits or node label does not match.");
}
return assigned;
}
readLock.lock();
int runnableAppsSize = pendingRunnableApps.size();
int preScheduleIndex = createScheduleIndex(runnableAppsSize);
try {
for ( ; getCurrentScheduleIndex() < runnableAppsSize; incCurrentScheduleIndex()) {
FSAppAttempt appAttempt = pendingRunnableApps.get(getCurrentScheduleIndex());
// 過濾RM中沒有記錄的請求
if (!appAttempt.isInRMContext()) {
LOG.error(appAttempt.getApplicationId()
+ " is not in rmContext, this is not expected");
continue;
}
// NM在該application的黑名單中,不分配
if (SchedulerAppUtils.isBlacklisted(appAttempt, node, LOG)) {
continue;
}
Resource pending = appAttempt.getAppAttemptResourceUsage().getPending();
if (LOG.isDebugEnabled()) {
LOG.debug("Assign container on: " + node.getNodeName()
+ ", app attempt id:" + appAttempt.getName()
+ ", app demand:" + appAttempt.getDemand()
+ ", app resource usage:" + appAttempt.getResourceUsage()
+ ", pending:" + pending);
}
// pending不為none,開始分配
if (!pending.equals(Resources.none())) {
assigned = appAttempt.assignContainer(node);
if (!assigned.equals(Resources.none())) {
// 分配結果為none,同理無需進行後續分配
break;
}
}
}
} finally {
readLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Node " + node.getNodeName() +
", offered to queue: " + getName() +
", prev schedule index:" + preScheduleIndex +
", current schedule index:" + getCurrentScheduleIndex() +
", runnable apps count:" + runnableAppsSize +
", assigned:" + assigned);
}
return assigned;
}
⑥ FSAppAttempt.assignContainer
public Resource assignContainer(FSSchedulerNode node) {
// AM佔用總資源超過佇列限制,預設AM總資源不超過佇列maxShare * 0.5f
if (isOverAMShareLimit()) {
return Resources.none();
}
return assignContainer(node, false);
}
內層assignContainer先獲取locality,分為三種:
字面意思,很好理解。
再內層assignContainer:
private Resource assignContainer(
FSSchedulerNode node, ResourceRequest request, NodeType type,
boolean reserved, SchedulerRequestKey schedulerKey) {
// 校驗label
if (!SchedulerUtils.checkNodeLabelExpression(node.getLabels(),
request.getNodeLabelExpression())) {
return Resources.none();
}
// 本次請求需要資源
Resource capability = request.getCapability();
// 當前NM空閒資源
Resource available = node.getAvailableResource();
Container container = null;
// 有reserved則分配reserved container,
// 沒有reserved則根據capability和schedulerKey new一個
if (reserved) {
container = node.getReservedContainer().getContainer();
} else {
container = createContainer(node, capability, schedulerKey);
}
// 請求資源 <= NM空閒資源
if (Resources.fitsIn(capability, available)) {
// 分配,傳送start事件
RMContainer allocatedContainer =
allocate(type, node, schedulerKey, request, container);
// 其他執行緒已經分配足夠的container了,此次不分配
if (allocatedContainer == null) {
// 清空reserved
if (reserved) {
unreserve(schedulerKey, node);
}
return Resources.none();
}
// 清空reserved
if (reserved) {
unreserve(schedulerKey, node);
}
// 更新該節點的資訊(container數,可用資源等)
node.allocateContainer(allocatedContainer);
// AM還沒起,第一個container就是AM,配置AM資訊
if (!isAmRunning() && !getUnmanagedAM()) {
setAMResource(container.getResource());
getQueue().addAMResourceUsage(container.getResource());
setAmRunning(true);
}
return container.getResource();
} else {
// 資源不夠分配
// 佇列不滿足資源max share限制,則不分配
if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) {
return Resources.none();
}
// 資源不夠分配,則進行reserve
// reserve具體大致是先判斷是否已經reserve,沒有則new一個新的RMContainer
// 已經reserve則把此次資源add到currentReservation中,併發送reserve事件
if (reserve(schedulerKey, node, container, type, reserved)) {
return FairScheduler.CONTAINER_RESERVED;
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Couldn't create reservation for app: " + getName()
+ ", at priority " + request.getPriority());
}
return Resources.none();
}
}
}
三、總結
本文介紹了FairScheduler處理NM心跳NodeUpdate,分配container的程式碼,另一部分在之前的文章 樹狀維護佇列和任務,定時計算fair share等資訊,並進行排序。
總的來說YARN這塊程式碼看起來還是比較簡單清晰的,之後會出CapacityScheduler相關的文章。