Hadoop之job提交流程原始碼簡析
1. 進入Job提交方法
public boolean waitForCompletion(boolean verbose
) throws IOException, InterruptedException,
ClassNotFoundException {
// 判斷Job的狀態,如果
if (state == JobState.DEFINE) {
submit();
}
// 執行完後,列印執行的資訊
if (verbose) {
monitorAndPrintJob();
} else {
// get the completion poll interval from the client.
int completionPollIntervalMillis =
Job.getCompletionPollInterval(cluster.getConf());
while (!isComplete()) {
try {
Thread.sleep(completionPollIntervalMillis);
} catch (InterruptedException ie) {
}
}
}
return isSuccessful();
}
1.1 提交Job到Cluster
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
// 建立Cluster物件,包含兩個關鍵屬性: ①檔案系統,負責讀入資料到程式,寫出資料,儲存結果 ②執行Job的客戶端,如果Job執行方式是Local,使用LocalJobRunner,如果Job執行方式是YARN,使用YarnRunner
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
1.2 建立Cluster
private synchronized void connect()
throws IOException, InterruptedException, ClassNotFoundException {
//根據使用者的configuration,建立相應的Cluster,負責執行Job
if (cluster == null) {
cluster =
ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
public Cluster run()
throws IOException, InterruptedException,
ClassNotFoundException {
return new Cluster(getConfiguration());
}
});
}
}
1.3 使用Submitter提交Job
JobStatus submitJobInternal(Job job, Cluster cluster)
throws ClassNotFoundException, InterruptedException, IOException {
// 驗證輸出目錄是否合法和存在
checkSpecs(job);
Configuration conf = job.getConfiguration();
addMRFrameworkToDistributedCache(conf);
// 獲取當前Job作業區域的路徑
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost();
if (ip != null) {
submitHostAddress = ip.getHostAddress();
submitHostName = ip.getHostName();
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
}
JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId);
// 如果本地提交: 當前job的作業目錄在eclipse所在的工作空間,所在碟符的/tmp
// 在YARN上提交,需要HDFS來找/tmp
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
JobStatus status = null;
try {
conf.set(MRJobConfig.USER_NAME,
UserGroupInformation.getCurrentUser().getShortUserName());
conf.set("hadoop.http.filter.initializers",
"org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir");
// get delegation token for the dir
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf);
populateTokenCache(conf, job.getCredentials());
// generate a secret to authenticate shuffle transfers
if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
KeyGenerator keyGen;
try {
keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
keyGen.init(SHUFFLE_KEY_LENGTH);
} catch (NoSuchAlgorithmException e) {
throw new IOException("Error generating shuffle secret key", e);
}
SecretKey shuffleKey = keyGen.generateKey();
TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
job.getCredentials());
}
if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled");
}
copyAndConfigureFiles(job, submitJobDir);
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job
LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
// 切片操作,產生split檔案和splitinfo,是對切片和對切片的說明資訊
// split記錄了 當前輸入目錄中,所有檔案,切了幾片,每一片都是一個FileSplit物件
// splitinto對所有片資訊的說明,記錄了每一片,應該到哪個節點去讀取資料
int maps = writeSplits(job, submitJobDir);
// 設定mapreduce.job.maps 為切片數
conf.setInt(MRJobConfig.NUM_MAPS, maps);
LOG.info("number of splits:" + maps);
// write "queue admins of the queue to which job is being submitted"
// to job file.
String queue = conf.get(MRJobConfig.QUEUE_NAME,
JobConf.DEFAULT_QUEUE_NAME);
AccessControlList acl = submitClient.getQueueAdmins(queue);
conf.set(toFullPropertyName(queue,
QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());
// removing jobtoken referrals before copying the jobconf to HDFS
// as the tasks don't need this setting, actually they may break
// because of it if present as the referral will point to a
// different job.
TokenCache.cleanUpTokenReferral(conf);
if (conf.getBoolean(
MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
// Add HDFS tracking ids
ArrayList<String> trackingIds = new ArrayList<String>();
for (Token<? extends TokenIdentifier> t :
job.getCredentials().getAllTokens()) {
trackingIds.add(t.decodeIdentifier().getTrackingId());
}
conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()]));
}
// Set reservation info if it exists
ReservationId reservationId = job.getReservationId();
if (reservationId != null) {
conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
}
// 將Job所有的配置資訊,寫入到Job.xml中!
writeConf(conf, submitJobFile);
//
// Now, actually submit the job (using the submit name)
//
printTokens(jobId, job.getCredentials());
// 正式準備提交Job
status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) {
return status;
} else {
throw new IOException("Could not launch job");
}
} finally {
if (status == null) {
LOG.info("Cleaning up the staging area " + submitJobDir);
if (jtFs != null && submitJobDir != null)
jtFs.delete(submitJobDir, true);
}
}
}
1.4 提交Job
public org.apache.hadoop.mapreduce.JobStatus submitJob(
org.apache.hadoop.mapreduce.JobID jobid, String jobSubmitDir,
Credentials credentials) throws IOException {
// 根據之前的準備工作,重構Job
Job job = new Job(JobID.downgrade(jobid), jobSubmitDir);
job.job.setCredentials(credentials);
return job.status;
}
1.5 建立LocalJobRunner可以執行的Job物件
public Job(JobID jobid, String jobSubmitDir) throws IOException {
……
// 將之前已經生成的Job執行的各種設定,重新賦值給LocalJobRunner$Job
// 開啟一個分執行緒來執行Job
this.start();
}
1.6 Job的run()
@Override public void run() { JobID jobId = profile.getJobID(); // JobContext代表Job執行的上下文,可以獲取Job中所有的配置資訊 JobContext jContext = new JobContextImpl(job, jobId);
org.apache.hadoop.mapreduce.OutputCommitter outputCommitter = null; try { outputCommitter = createOutputCommitter(conf.getUseNewMapper(), jobId, conf); } catch (Exception e) { LOG.info("Failed to createOutputCommitter", e); return; }
try { // 根據切片資訊,建立TaskSplitMetaInfo陣列,有幾片,陣列大小就是幾 TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir);
int numReduceTasks = job.getNumReduceTasks(); outputCommitter.setupJob(jContext); status.setSetupProgress(1.0f); // 指定儲存所有MapTask輸出目錄的位置 Map<TaskAttemptID, MapOutputFile> mapOutputFiles = Collections.synchronizedMap(new HashMap<TaskAttemptID, MapOutputFile>()); // 建立執行的MapTask程序列表 List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables( taskSplitMetaInfos, jobId, mapOutputFiles);
initCounters(mapRunnables.size(), numReduceTasks); // 建立一個執行緒池 ExecutorService mapService = createMapExecutor(); // 執行所有的MapTask , 需要檢視MapTaskRunable的run() runTasks(mapRunnables, mapService, "map");
try { if (numReduceTasks > 0) { List<RunnableWithThrowable> reduceRunnables = getReduceTaskRunnables( jobId, mapOutputFiles); ExecutorService reduceService = createReduceExecutor(); runTasks(reduceRunnables, reduceService, "reduce"); } } finally { for (MapOutputFile output : mapOutputFiles.values()) { output.removeAll(); } } // delete the temporary directory in output directory outputCommitter.commitJob(jContext); status.setCleanupProgress(1.0f);
if (killed) { this.status.setRunState(JobStatus.KILLED); } else { this.status.setRunState(JobStatus.SUCCEEDED); }
JobEndNotifier.localRunnerNotification(job, status); } catch (Throwable t) { try { outputCommitter.abortJob(jContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED); } catch (IOException ioe) { LOG.info("Error cleaning up job:" + id); } status.setCleanupProgress(1.0f); if (killed) { this.status.setRunState(JobStatus.KILLED); } else { this.status.setRunState(JobStatus.FAILED); } LOG.warn(id, t);
JobEndNotifier.localRunnerNotification(job, status);
} finally { try { fs.delete(systemJobFile.getParent(), true); // delete submit dir localFs.delete(localJobFile, true); // delete local copy // Cleanup distributed cache localDistributedCacheManager.close(); } catch (IOException e) { LOG.warn("Error cleaning up "+id+": "+e); } } } |
2. 進入Map階段
2.1 進入MapTaskRunable的run()
public void run() { try { // 生成當前Task任務的id TaskAttemptID mapId = new TaskAttemptID(new TaskID( jobId, TaskType.MAP, taskId), 0); LOG.info("Starting task: " + mapId); mapIds.add(mapId); MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId, info.getSplitIndex(), 1); map.setUser(UserGroupInformation.getCurrentUser(). getShortUserName()); setupChildMapredLocalDirs(map, localConf); // 建立當前MapTask 輸出的檔案物件 MapOutputFile mapOutput = new MROutputFiles(); mapOutput.setConf(localConf); mapOutputFiles.put(mapId, mapOutput);
map.setJobFile(localJobFile.toString()); localConf.setUser(map.getUser()); map.localizeConfiguration(localConf); map.setConf(localConf); try { map_tasks.getAndIncrement(); myMetrics.launchMap(mapId); // 進入MapTask的run() map.run(localConf, Job.this); myMetrics.completeMap(mapId); } finally { map_tasks.getAndDecrement(); }
LOG.info("Finishing task: " + mapId); } catch (Throwable e) { this.storedException = e; } } } |
2.2 MapTask的run()
@Override public void run(final JobConf job, final TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { this.umbilical = umbilical; // 判斷是否需要reduce階段 // Map階段,可以分為兩個階段: map: 呼叫Mapper的map()方法,對輸入的key-value進行處理 // sort : 當map()處理完,context.wirte(),將key-value儲存到檔案中! // 在儲存到檔案之前,會將所有的key-value進行排序,會經過排序的階段 if (isMapTask()) { // If there are no reducers then there won't be any sort. Hence the map // phase will govern the entire attempt's progress. if (conf.getNumReduceTasks() == 0) { mapPhase = getProgress().addPhase("map", 1.0f); } else { // If there are reducers then the entire attempt's progress will be // split between the map phase (67%) and the sort phase (33%). mapPhase = getProgress().addPhase("map", 0.667f); sortPhase = getProgress().addPhase("sort", 0.333f); } } TaskReporter reporter = startReporter(umbilical);
boolean useNewApi = job.getUseNewMapper(); initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask if (jobCleanup) { runJobCleanupTask(umbilical, reporter); return; } if (jobSetup) { runJobSetupTask(umbilical, reporter); return; } if (taskCleanup) { runTaskCleanupTask(umbilical, reporter); return; }
if (useNewApi) { runNewMapper(job, splitMetaInfo, umbilical, reporter); } else { runOldMapper(job, splitMetaInfo, umbilical, reporter); } done(umbilical, reporter); } |
2.3 執行Mapper
@SuppressWarnings("unchecked") private <INKEY,INVALUE,OUTKEY,OUTVALUE> void runNewMapper(final JobConf job, final TaskSplitIndex splitIndex, final TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, ClassNotFoundException, InterruptedException { // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID(), reporter); // make a mapper org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); // make the input format org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); // rebuild the input split org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); LOG.info("Processing split: " + split);
org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext);
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); org.apache.hadoop.mapreduce.RecordWriter output = null;
// get an output object if (job.getNumReduceTasks() == 0) { output = new NewDirectOutputCollector(taskContext, job, umbilical, reporter); } else { output = new NewOutputCollector(taskContext, |