YARN原始碼分析—AM-RM通訊協議,獲得資源
在上幾篇博文中分析了YARN排程模擬器SLS的原始碼,重點分析了AM與RM之間的通訊協議。
接下來分析在YARN專案中,AM-RM通訊如何實現的。
注意點:在YARN中,真正已經實現的只有RM和NM,而AM和client只是提供了api,需要使用者自行實現。
而AM的主要功能是根據業務需求,從RM處申請資源,並利用這些資源完成業務邏輯,因此AM需要跟RM通訊,也需要跟NM通訊。
通訊協議:
AM-RM:ApplicationMasterProtocol
AM-NM: ContainerManagementProtocol
這兩個協議的定義在hadoop-yarn-api中
AM-RM協議:
AM與RM通過ApplicationMasterProtocol協議進行通訊,該協議提供了幾種方法:
1.向RM註冊AM
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnException, IOException;
2.告訴RM,應用結束
public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnException, IOException;
3.向RM請求資源
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException;
客戶端(client)向RM提交應用後,RM會分配一定的資源啟動AM,AM啟動後呼叫ApplicationMasterProtocol的registerApplicationMaster方法向RM註冊自己。完成註冊後,呼叫allocate方法向RM申請執行任務的資源。 獲取資源後,通過與NM的通訊協議:ContainerManagementProtocol啟動資源容器,完成任務。完成後,通過ApplicationMasterProtocol的finishApplicationMaster方法向RM彙報應用結束,並登出AM。
接下來詳細看下AM向RM請求資源的過程:
1.AM向RM註冊
AM通過allocate方法向RM申請或釋放資源。資訊被封裝為AllocateRequest裡。
舉個例子:hadoop實現了MR的例子。
org.apache.hadoop.mapreduce.v2.app.MRAppMaster.java
在serviceinit函式中,首先建立job也就是任務。
@Override
protected void serviceInit(final Configuration conf) throws Exception {
// create the job classloader if enabled
createJobClassLoader(conf);
initJobCredentialsAndUGI(conf);
......
if(!stagingExists) {
isLastAMRetry = true;
LOG.info("Attempt num: " + appAttemptID.getAttemptId() +
" is last retry: " + isLastAMRetry +
" because the staging dir doesn't exist.");
errorHappenedShutDown = true;
forcedState = JobStateInternal.ERROR;
shutDownMessage = "Staging dir does not exist " + stagingDir;
LOG.error(shutDownMessage);
} else if (commitStarted) {
//A commit was started so this is the last time, we just need to know
// what result we will use to notify, and how we will unregister
errorHappenedShutDown = true;
isLastAMRetry = true;
LOG.info("Attempt num: " + appAttemptID.getAttemptId() +
" is last retry: " + isLastAMRetry +
" because a commit was started.");
copyHistory = true;
if (commitSuccess) {
shutDownMessage =
"Job commit succeeded in a prior MRAppMaster attempt " +
"before it crashed. Recovering.";
forcedState = JobStateInternal.SUCCEEDED;
} else if (commitFailure) {
shutDownMessage =
"Job commit failed in a prior MRAppMaster attempt " +
"before it crashed. Not retrying.";
forcedState = JobStateInternal.FAILED;
} else {
if (isCommitJobRepeatable()) {
// cleanup previous half done commits if committer supports
// repeatable job commit.
errorHappenedShutDown = false;
cleanupInterruptedCommit(conf, fs, startCommitFile);
} else {
//The commit is still pending, commit error
shutDownMessage =
"Job commit from a prior MRAppMaster attempt is " +
"potentially in progress. Preventing multiple commit executions";
forcedState = JobStateInternal.ERROR;
}
}
}
......
// service to allocate containers from RM (if non-uber) or to fake it (uber)
containerAllocator = createContainerAllocator(null, context);
addIfService(containerAllocator);
dispatcher.register(ContainerAllocator.EventType.class, containerAllocator);
然後最後從RM分配容器資源。通過呼叫createContainerAllocator函式實現。
該函式實現如下:
protected ContainerAllocator createContainerAllocator(
final ClientService clientService, final AppContext context) {
return new ContainerAllocatorRouter(clientService, context);
}
new了ContainerAllocatorRouter類並返回。
ContainerAllocatorRouter實現如下:
private final class ContainerAllocatorRouter extends AbstractService
implements ContainerAllocator, RMHeartbeatHandler {
private final ClientService clientService;
private final AppContext context;
private ContainerAllocator containerAllocator;
ContainerAllocatorRouter(ClientService clientService,
AppContext context) {
super(ContainerAllocatorRouter.class.getName());
this.clientService = clientService;
this.context = context;
}
@Override
protected void serviceStart() throws Exception {
if (job.isUber()) {
MRApps.setupDistributedCacheLocal(getConfig());
this.containerAllocator = new LocalContainerAllocator(
this.clientService, this.context, nmHost, nmPort, nmHttpPort
, containerID);
} else {
this.containerAllocator = new RMContainerAllocator(
this.clientService, this.context, preemptionPolicy);
}
((Service)this.containerAllocator).init(getConfig());
((Service)this.containerAllocator).start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
ServiceOperations.stop((Service) this.containerAllocator);
super.serviceStop();
}
@Override
public void handle(ContainerAllocatorEvent event) {
this.containerAllocator.handle(event);
}
public void setSignalled(boolean isSignalled) {
((RMCommunicator) containerAllocator).setSignalled(isSignalled);
}
public void setShouldUnregister(boolean shouldUnregister) {
((RMCommunicator) containerAllocator).setShouldUnregister(shouldUnregister);
}
@Override
public long getLastHeartbeatTime() {
return ((RMCommunicator) containerAllocator).getLastHeartbeatTime();
}
@Override
public void runOnNextHeartbeat(Runnable callback) {
((RMCommunicator) containerAllocator).runOnNextHeartbeat(callback);
}
}
ContainerAllocatorRouter返回後,serviceinit繼續往下執行,將返回的該類服務加入到schedule中:
addIfService(containerAllocator);
然後會啟動該類的服務:serviceStart
@Override
protected void serviceStart() throws Exception {
if (job.isUber()) {
MRApps.setupDistributedCacheLocal(getConfig());
this.containerAllocator = new LocalContainerAllocator(
this.clientService, this.context, nmHost, nmPort, nmHttpPort
, containerID);
} else {
this.containerAllocator = new RMContainerAllocator(
this.clientService, this.context, preemptionPolicy);
}
((Service)this.containerAllocator).init(getConfig());
((Service)this.containerAllocator).start();
super.serviceStart();
}
可以看到該類服務會判斷job型別,是uber or not。uber是啥(hadoop針對小規模MR的本地模式,均在一個jvm中執行。可以理解為本地模式)。
如果不是uber,則new一個RMContainerAllocator,然後init和start這個類。
看下這個類:
org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator.java
建構函式:用傳入的引數進行成員變數初始化
public RMContainerAllocator(ClientService clientService, AppContext context,
AMPreemptionPolicy preemptionPolicy) {
super(clientService, context);
this.preemptionPolicy = preemptionPolicy;
this.stopped = new AtomicBoolean(false);
this.clock = context.getClock();
this.assignedRequests = createAssignedRequests();
}
(Service)this.containerAllocator).init(getConfig());中的init函式:
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
reduceSlowStart = conf.getFloat(
MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART,
DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART);
maxReduceRampupLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_RAMPUP_UP_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_RAMP_UP_LIMIT);
maxReducePreemptionLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
reducerUnconditionalPreemptionDelayMs = 1000 * conf.getInt(
MRJobConfig.MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_UNCONDITIONAL_PREEMPT_DELAY_SEC);
reducerNoHeadroomPreemptionDelayMs = conf.getInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
MRJobConfig.DEFAULT_JOB_RUNNING_MAP_LIMIT);
maxRunningReduces = conf.getInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT,
MRJobConfig.DEFAULT_JOB_RUNNING_REDUCE_LIMIT);
RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
mapNodeLabelExpression = conf.get(MRJobConfig.MAP_NODE_LABEL_EXP);
reduceNodeLabelExpression = conf.get(MRJobConfig.REDUCE_NODE_LABEL_EXP);
// Init startTime to current time. If all goes well, it will be reset after
// first attempt to contact RM.
retrystartTime = System.currentTimeMillis();
this.scheduledRequests.setNumOpportunisticMapsPercent(
conf.getInt(MRJobConfig.MR_NUM_OPPORTUNISTIC_MAPS_PERCENT,
MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT));
LOG.info(this.scheduledRequests.getNumOpportunisticMapsPercent() +
"% of the mappers will be scheduled using OPPORTUNISTIC containers");
}
對所需容器的一些引數進行配置,比如心跳時間,map或reduce等。
((Service)this.containerAllocator).start():start函式
@Override
protected void serviceStart() throws Exception {
this.eventHandlingThread = new Thread() {
@SuppressWarnings("unchecked")
@Override
public void run() {
ContainerAllocatorEvent event;
while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
try {
event = RMContainerAllocator.this.eventQueue.take();
} catch (InterruptedException e) {
if (!stopped.get()) {
LOG.error("Returning, interrupted : " + e);
}
return;
}
try {
handleEvent(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " to the ContainreAllocator", t);
// Kill the AM
eventHandler.handle(new JobEvent(getJob().getID(),
JobEventType.INTERNAL_ERROR));
return;
}
}
}
};
this.eventHandlingThread.start();
super.serviceStart();
}
服務啟動後,會進入心跳模式。
迴圈執行類中的heartbeat方法:
@Override
protected synchronized void heartbeat() throws Exception {
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
List<Container> allocatedContainers = getResources();
if (allocatedContainers != null && allocatedContainers.size() > 0) {
scheduledRequests.assign(allocatedContainers);
}
int completedMaps = getJob().getCompletedMaps();
int completedTasks = completedMaps + getJob().getCompletedReduces();
if ((lastCompletedTasks != completedTasks) ||
(scheduledRequests.maps.size() > 0)) {
lastCompletedTasks = completedTasks;
recalculateReduceSchedule = true;
}
if (recalculateReduceSchedule) {
boolean reducerPreempted = preemptReducesIfNeeded();
if (!reducerPreempted) {
// Only schedule new reducers if no reducer preemption happens for
// this heartbeat
scheduleReduces(getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
mapResourceRequest, reduceResourceRequest, pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
}
recalculateReduceSchedule = false;
}
scheduleStats.updateAndLogIfChanged("After Scheduling: ");
}
在該方法中,最重要的是第一行,獲得資源:
List<Container> allocatedContainers = getResources();
獲得資源後,加入到schedule的assign分配中:
if (allocatedContainers != null && allocatedContainers.size() > 0) {
scheduledRequests.assign(allocatedContainers);
}
接下來逐段具體分析getResources函式:
@SuppressWarnings("unchecked")
private List<Container> getResources() throws Exception {
applyConcurrentTaskLimits();
// will be null the first time
Resource headRoom = Resources.clone(getAvailableResources());
AllocateResponse response;
/*
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
* milliseconds before aborting. During this interval, AM will still try
* to contact the RM.
*/
try {
response = makeRemoteRequest();
// Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis();
這裡最重要的就是response = makeRemoteRequest(),這是AM向RM通訊索取資源的關鍵方法。
接著就是try後面進行catch捕捉異常
} catch (ApplicationAttemptNotFoundException e ) {
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new RMContainerAllocationException(
"Resource Manager doesn't recognize AttemptId: "
+ this.getContext().getApplicationAttemptId(), e);
} catch (ApplicationMasterNotRegisteredException e) {
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
+ " hence resync and send outstanding requests.");
// RM may have restarted, re-register with RM.
lastResponseID = 0;
register();
addOutstandingRequestOnResync();
return null;
} catch (InvalidLabelResourceRequestException e) {
// If Invalid label exception is received means the requested label doesnt
// have access so killing job in this case.
String diagMsg = "Requested node-label-expression is invalid: "
+ StringUtils.stringifyException(e);
LOG.info(diagMsg);
JobId jobId = this.getJob().getID();
eventHandler.handle(new JobDiagnosticsUpdateEvent(jobId, diagMsg));
eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
throw e;
} catch (Exception e) {
// This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired.
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.JOB_AM_REBOOT));
throw new RMContainerAllocationException("Could not contact RM after " +
retryInterval + " milliseconds.");
}
// Throw this up to the caller, which may decide to ignore it and
// continue to attempt to contact the RM.
throw e;
}
也就是捕捉response = makeRemoteRequest返回的異常,比如資源不夠,資源分配出錯等。
接著:
Resource newHeadRoom = getAvailableResources();
List<Container> newContainers = response.getAllocatedContainers();
這是將response(回覆)中的已分配好的container資源拎出來賦給List<Container> newContainers。這就是AM所需要的資源。
接著:
// Setting NMTokens
if (response.getNMTokens() != null) {
for (NMToken nmToken : response.getNMTokens()) {
NMTokenCache.setNMToken(nmToken.getNodeId().toString(),
nmToken.getToken());
}
}
// Setting AMRMToken
if (response.getAMRMToken() != null) {
updateAMRMToken(response.getAMRMToken());
}
List<ContainerStatus> finishedContainers =
response.getCompletedContainersStatuses();
// propagate preemption requests
final PreemptionMessage preemptReq = response.getPreemptionMessage();
if (preemptReq != null) {
preemptionPolicy.preempt(
new PreemptionContext(assignedRequests), preemptReq);
}
if (newContainers.size() + finishedContainers.size() > 0
|| !headRoom.equals(newHeadRoom)) {
//something changed
recalculateReduceSchedule = true;
if (LOG.isDebugEnabled() && !headRoom.equals(newHeadRoom)) {
LOG.debug("headroom=" + newHeadRoom);
}
}
if (LOG.isDebugEnabled()) {
for (Container cont : newContainers) {
LOG.debug("Received new Container :" + cont);
}
}
//Called on each allocation. Will know about newly blacklisted/added hosts.
computeIgnoreBlacklisting();
handleUpdatedNodes(response);
handleJobPriorityChange(response);
// Handle receiving the timeline collector address and token for this app.
MRAppMaster.RunningAppContext appContext =
(MRAppMaster.RunningAppContext)this.getContext();
if (appContext.getTimelineV2Client() != null) {
appContext.getTimelineV2Client().
setTimelineCollectorInfo(response.getCollectorInfo());
}
for (ContainerStatus cont : finishedContainers) {
processFinishedContainer(cont);
}
return newContainers;
}
進行一些處理後,返回newContainers。
接下來對getResources方法中呼叫到的makeRemoteRequest方法進行分析:
org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.java
makeRemoteRequest方法:
protected AllocateRespo makeRemoteRequestnse () throws YarnException,
IOException {
applyRequestLimits();
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
new ArrayList<String>(blacklistRemovals));
AllocateRequest allocateRequest =
AllocateRequest.newInstance(lastResponseID,
super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
new ArrayList<ContainerId>(release), blacklistRequest);
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
lastResponseID = allocateResponse.getResponseId();
availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes();
int numCompletedContainers =
allocateResponse.getCompletedContainersStatuses().size();
if (ask.size() > 0 || release.size() > 0) {
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() + " newContainers="
+ allocateResponse.getAllocatedContainers().size()
+ " finishedContainers=" + numCompletedContainers
+ " resourcelimit=" + availableResources + " knownNMs="
+ clusterNmCount);
}
ask.clear();
release.clear();
if (numCompletedContainers > 0) {
// re-send limited requests when a container completes to trigger asking
// for more containers
requestLimitsToUpdate.addAll(requestLimits.keySet());
}
if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) {
LOG.info("Update the blacklist for " + applicationId +
": blacklistAdditions=" + blacklistAdditions.size() +
" blacklistRemovals=" + blacklistRemovals.size());
}
blacklistAdditions.clear();
blacklistRemovals.clear();
return allocateResponse;
}
定義一個AllocateRequest物件,呼叫newInstance例項化,並加入了一個getApplicationProgress方法。
AllocateRequest allocateRequest =
AllocateRequest.newInstance(lastResponseID,
super.getApplicationProgress(), new ArrayList<ResourceRequest>(ask),
new ArrayList<ContainerId>(release), blacklistRequest);
這樣就構造完成一個標準的allocateRequest物件。可以傳送給RM了。
接著,呼叫scheduler.allocate將請求加入排程中,返回值為標準的RM返回格式AllocateResponse。
這裡的scheduler.allocate(allocateRequest)是不是似曾相識的感覺?
這裡就是最上面提到的ApplicationMasterProtocol協議的第三個方法:
public AllocateResponse allocate(AllocateRequest request)
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
這就完成了AM向RM請求資源和回覆資源。
然後將allocateResponse返回即可。
最後來詳細分析下AllocateRequest類和AllocateResponse類。
AllocateRequest:AM向RM請求資源的標準包
org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest.java
AllocateRequest包格式如下:
1. responseID,相應的ID,用於區分重複相應
2. appProgress,程序的進度
3. askList(List<ResourceRequest> resourceAsk),AM向RM請求的資源列表,是一個List<ResourceRequest> 物件。其中ResourceRequest是一個資源請求的詳細引數,包括容器個數,容器容量,分配策略等。
hadoop-yarn-api/src/main/proto/yarn_protos.proto
ResourceRequest
message ResourceRequestProto {
optional PriorityProto priority = 1;
optional string resource_name = 2;
optional ResourceProto capability = 3;
optional int32 num_containers = 4;
optional bool relax_locality = 5 [default = true];
optional string node_label_expression = 6;
optional ExecutionTypeRequestProto execution_type_request = 7;
optional int64 allocation_request_id = 8 [default = -1];
}
4. resourceBlacklistRequest,要新增或者刪除的資源黑名單
AllocateResponse類:RM向AM回覆的資源包
org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse.java
包內容包括:
1.responseId,回覆的ID,避免重複響應
2.numClusterNodes,叢集規模大小
3.completedContainersStatuses,已完成的容器狀態列表
4. allocatedContainers,RM新分配的資源給AM,這些資源封裝在Container類中,因此返回型別通常為List<Container>
Container組成:
org.apache.hadoop.yarn.api.records.Container.java
public static Container newInstance(ContainerId containerId, NodeId nodeId,
String nodeHttpAddress, Resource resource, Priority priority,
Token containerToken, ExecutionType executionType) {
Container container = Records.newRecord(Container.class);
container.setId(containerId);
container.setNodeId(nodeId);
container.setNodeHttpAddress(nodeHttpAddress);
container.setResource(resource);
container.setPriority(priority);
container.setContainerToken(containerToken);
container.setExecutionType(executionType);
return container;
}
可以看到container組成包括:
a. container ID
b. Node ID
c. nodeHttpAddress:節點http的地址
d. resource:為Resource類,格式<mem, vcores>
e. priority: 優先順序
f. containerToken: 容器令牌
g. executionType: 容器執行的型別
5.updatedNodes,狀態被更新過的所有節點列表,每個節點的更新資訊存放在NodeReport類中,因此返回型別通常為List<NodeReport>
6.amCommand,RM給AM傳送的控制命令,包括重連和關閉。
7.preemptionMessage,資源搶佔資訊,包括兩部分:強制回收部分和可自主調配部分
8.nmTokens,AM與NM之間的通訊令牌
over