原始碼走讀-Yarn-ResourceManager04-MR任務提交-客戶端側分析
0x05 RM排程-MR任務提交-客戶端側分析
5.1 mapreduce.job
org.apache.hadoop.mapreduce.Job
我們都知道,MR任務的一般結尾會有一句話是job.waitForCompletion(true)
,這行程式碼意思是提交任務並等待結束。我們的分析就從這裡入手:
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
if (state == Job.JobState.DEFINE) {
//提交任務
submit();
}
if (verbose) {
//監控任務執行,持續列印輸出,直到任務完成(成功或失敗)
this.monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis = getCompletionPollInterval(this.cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
下面看看submit
方法:
// 提交任務都叢集,然後立刻返回
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
// 建立JobSubmitter
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
// 提交job
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
可以看到使用了JobSubmitter
類,下面接著看。
5.2 JobSubmitter
org.apache.hadoop.mapreduce.JobSubmitter
接著看submitter.submitJobInternal
方法。由於裡面程式碼太多,我這裡只寫出最關鍵的幾句:
// 獲取jobId
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
// 提交job
status = this.submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
可以看到,這裡先獲取jobID把,再job提交到了submitClient
,他是一個ClientProtocol
介面的實現類。上面的程式碼看了以後我們知道,現在的主要工作是兩步:獲取JobId然後提交Job。下面我們分開講下這兩個流程:因為我們是提交到Yarn跑任務的,所以實際使用的是YARNRunner
。
5.3 獲取JobID
5.3.1 YARNRunner
org.apache.hadoop.mapred.YARNRunner
先看看前面使用的submitClient.getNewJobID()
:
//這裡是呼叫了resMgrDelegate.getNewJobID來獲取jobId
@Override
public JobID getNewJobID() throws IOException, InterruptedException {
return resMgrDelegate.getNewJobID();
}
上面我們看到是用了resMgrDelegate
,那繼續看看這個ResourceMgrDelegate
是啥:
5.3.2 ResourceMgrDelegate
ResourceMgrDelegate
的部分程式碼:
public class ResourceMgrDelegate extends YarnClient{
private YarnConfiguration conf;
private ApplicationSubmissionContext application;
private ApplicationId applicationId;
@Private
@VisibleForTesting
protected YarnClient client;
private Text rmDTService;
/**
* Delegate responsible for communicating with the Resource Manager's
* {@link ApplicationClientProtocol}.
* 被委託負責用ApplicationClientProtocol協議和RM通訊互動
* @param conf the configuration object.
*/
public ResourceMgrDelegate(YarnConfiguration conf) {
super(ResourceMgrDelegate.class.getName());
this.conf = conf;
this.client = YarnClient.createYarnClient();
init(conf);
start();
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
client.init(conf);
super.serviceInit(conf);
}
@Override
protected void serviceStart() throws Exception {
client.start();
super.serviceStart();
}
@Override
protected void serviceStop() throws Exception {
client.stop();
super.serviceStop();
}
//獲取一個新的JobId
public JobID getNewJobID() throws IOException, InterruptedException {
try {
// 這裡的client是一個YarnClientImpl例項
// 獲取一個ApplicationSubmissionContext
this.application = client.createApplication().getApplicationSubmissionContext();
// 獲取applicationId
this.applicationId = this.application.getApplicationId();
return TypeConverter.fromYarn(applicationId);
} catch (YarnException e) {
throw new IOException(e);
}
}
}
看到serviceInit
serviceStart
等方法有沒有很熟悉?對他的父類YarnClient
就是繼承自AbstractService
。
這裡有用YarnClientImpl
和ApplicationSubmissionContext
,我們在後面介紹。
5.3.3 YarnClientImpl
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
我們先看看前面提到的ResourceMgrDelegate
中用的client.createApplication
方法:
@Override
public YarnClientApplication createApplication()
throws YarnException, IOException {
// 生成一個ApplicationSubmissionContextPBImpl例項
// ApplicationSubmissionContext 表示所有RM為應用程式啟動AM所需的所有資訊
ApplicationSubmissionContext context = Records.newRecord
(ApplicationSubmissionContext.class);
// 獲得一個新應用程式的返回資訊
GetNewApplicationResponse newApp = getNewApplication();
ApplicationId appId = newApp.getApplicationId();
// 將appId儲存到ApplicationSubmissionContext中
context.setApplicationId(appId);
// 拿到appId後例項化一個封裝了applictionResponse和context的物件
return new YarnClientApplication(newApp, context);
}
接著看看getNewApplication
方法:
private GetNewApplicationResponse getNewApplication()
throws YarnException, IOException {
GetNewApplicationRequest request =
Records.newRecord(GetNewApplicationRequest.class);
// 這裡的rmClient是關鍵
return rmClient.getNewApplication(request);
}
上面方法中的rmClient
是實現了ApplicationClientProtocol
介面的類,下面看看rmClient.getNewApplication
方法。
5.3.4 ApplicationClientProtocolPBClientImpl
org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl
/**
* 這個方法繼承自ApplicationClientProtocol,註釋如下:
* 這個方法是客戶端用來獲取一個新的ApplicationId,然後用它來提交新的application
* 該方法執行後,RM會在返回一個GetNewApplicationResponse,
* 他包含了一個新的、單調遞增的ApplicationId以及一些詳細的叢集資訊如最大資源容量
*
* 也就是說這裡只是獲取appId,不會真正執行app
*/
@Override
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws YarnException,
IOException {
GetNewApplicationRequestProto requestProto =
((GetNewApplicationRequestPBImpl) request).getProto();
try {
return new GetNewApplicationResponsePBImpl(proxy.getNewApplication(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
到這裡,客戶端側的獲取JobId流程就介紹完了。下面,我們接著講SubmitJob流程。
5.4 提交Job
5.4.1 YARNRunner
下面接著看JobSubmitter
中呼叫的YARNRunner的submitJob
方法:
@Override
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
addHistoryToken(ts);
// 將所需資訊構建為appContext,來為開啟 MR 的 ApplicationMaster做準備
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// 提交給ResourceManager
try {
//這裡就是剛才ResourceMgrDelegate.getNewJobId獲取到的applicationId
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
// appMaster資訊報告
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}
到這裡,就介紹完了YARNRunner
中的submitJob
兩個方法,依然是mapreduce
包中的程式碼。下面的開始進入yarn
包。
5.4.2 YarnClientImpl
直接看submitApplication
方法:
@Override
public ApplicationId submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
if (applicationId == null) {
throw new ApplicationIdNotProvidedException(
"ApplicationId is not provided in ApplicationSubmissionContext");
}
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
// Automatically add the timeline DT into the CLC
// Only when the security and the timeline service are both enabled
if (isSecurityEnabled() && timelineServiceEnabled) {
addTimelineDelegationToken(appContext.getAMContainerSpec());
}
// 提交application的請求
rmClient.submitApplication(request);
int pollCount = 0;
long startTime = System.currentTimeMillis();
EnumSet<YarnApplicationState> waitingStates =
EnumSet.of(YarnApplicationState.NEW,
YarnApplicationState.NEW_SAVING,
YarnApplicationState.SUBMITTED);
EnumSet<YarnApplicationState> failToSubmitStates =
EnumSet.of(YarnApplicationState.FAILED,
YarnApplicationState.KILLED);
while (true) {
try {
ApplicationReport appReport = getApplicationReport(applicationId);
YarnApplicationState state = appReport.getYarnApplicationState();
if (!waitingStates.contains(state)) {
if(failToSubmitStates.contains(state)) {
throw new YarnException("Failed to submit " + applicationId +
" to YARN : " + appReport.getDiagnostics());
}
LOG.info("Submitted application " + applicationId);
break;
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be submitted successfully");
}
// Notify the client through the log every 10 poll, in case the client
// is blocked here too long.
if (++pollCount % 10 == 0) {
LOG.info("Application submission is not finished, " +
"submitted application " + applicationId +
" is still in " + state);
}
try {
Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
LOG.error("Interrupted while waiting for application "
+ applicationId
+ " to be successfully submitted.");
}
} catch (ApplicationNotFoundException ex) {
// FailOver or RM restart happens before RMStateStore saves
// ApplicationState
LOG.info("Re-submit application " + applicationId + "with the " +
"same ApplicationSubmissionContext");
rmClient.submitApplication(request);
}
}
return applicationId;
}
5.4.3 ApplicationClientProtocolPBClientImpl
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException,
IOException {
SubmitApplicationRequestProto requestProto =
((SubmitApplicationRequestPBImpl) request).getProto();
try {
return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}