hadoop2 作業執行過程之作業提交
hadoop任務的提交常用的兩種,一種是測試常用的IDE遠端提交,另一種就是生產上用的客戶端命令列提交
通用的任務程式提交步驟為:
1.將程式打成jar包;
2.將jar包上傳到HDFS上;
3.用命令列提交HDFS上的任務程式。
跟著提交步驟從命令列提交開始
最簡單的提交命令應該如:
hadoop jar /home/hadoop/hadoop-2.2.0/hadoop-examples.jar wordcount inputPath outputPath
在名為hadoop的shell 命令檔案中當引數為jar時
確定了要執行的CLASS檔案和環境變數後最後執行了了exec命令來執行
看org.apache.hadoop.util.RunJar類的main方法
1 public static void main(String[] args) throws Throwable { 2 String usage = "RunJar jarFile [mainClass] args..."; 3 //驗證提交的引數數量 4 if (args.length < 1) { 5 System.err.println(usage); 6 System.exit(-1); 7 } 8 //驗證jar檔案是否存在 9 int firstArg = 0; 10 String fileName = args[firstArg++];11 File file = new File(fileName); 12 if (!file.exists() || !file.isFile()) { 13 System.err.println("Not a valid JAR: " + file.getCanonicalPath()); 14 System.exit(-1); 15 } 16 String mainClassName = null; 17 18 JarFile jarFile; 19 try { 20 jarFile = new JarFile(fileName);21 } catch(IOException io) { 22 throw new IOException("Error opening job jar: " + fileName) 23 .initCause(io); 24 } 25 //驗證是否存在main方法 26 Manifest manifest = jarFile.getManifest(); 27 if (manifest != null) { 28 mainClassName = manifest.getMainAttributes().getValue("Main-Class"); 29 } 30 jarFile.close(); 31 32 if (mainClassName == null) { 33 if (args.length < 2) { 34 System.err.println(usage); 35 System.exit(-1); 36 } 37 mainClassName = args[firstArg++]; 38 } 39 mainClassName = mainClassName.replaceAll("/", "."); 40 //設定臨時目錄並驗證 41 File tmpDir = new File(new Configuration().get("hadoop.tmp.dir")); 42 ensureDirectory(tmpDir); 43 44 final File workDir; 45 try { 46 workDir = File.createTempFile("hadoop-unjar", "", tmpDir); 47 } catch (IOException ioe) { 48 // If user has insufficient perms to write to tmpDir, default 49 // "Permission denied" message doesn't specify a filename. 50 System.err.println("Error creating temp dir in hadoop.tmp.dir " 51 + tmpDir + " due to " + ioe.getMessage()); 52 System.exit(-1); 53 return; 54 } 55 56 if (!workDir.delete()) { 57 System.err.println("Delete failed for " + workDir); 58 System.exit(-1); 59 } 60 ensureDirectory(workDir); 61 //增加刪除工作目錄的鉤子,任務執行完後要刪除 62 ShutdownHookManager.get().addShutdownHook( 63 new Runnable() { 64 @Override 65 public void run() { 66 FileUtil.fullyDelete(workDir); 67 } 68 }, SHUTDOWN_HOOK_PRIORITY); 69 70 71 unJar(file, workDir); 72 73 ArrayList<URL> classPath = new ArrayList<URL>(); 74 classPath.add(new File(workDir+"/").toURI().toURL()); 75 classPath.add(file.toURI().toURL()); 76 classPath.add(new File(workDir, "classes/").toURI().toURL()); 77 File[] libs = new File(workDir, "lib").listFiles(); 78 if (libs != null) { 79 for (int i = 0; i < libs.length; i++) { 80 classPath.add(libs[i].toURI().toURL()); 81 } 82 } 83 //通過反射的方式執行任務程式的main方法,並把剩餘的引數作為任務程式main方法的引數 84 ClassLoader loader = 85 new URLClassLoader(classPath.toArray(new URL[0])); 86 87 Thread.currentThread().setContextClassLoader(loader); 88 Class<?> mainClass = Class.forName(mainClassName, true, loader); 89 Method main = mainClass.getMethod("main", new Class[] { 90 Array.newInstance(String.class, 0).getClass() 91 }); 92 String[] newArgs = Arrays.asList(args) 93 .subList(firstArg, args.length).toArray(new String[0]); 94 try { 95 main.invoke(null, new Object[] { newArgs }); 96 } catch (InvocationTargetException e) { 97 throw e.getTargetException(); 98 } 99 }
環境設定好後就要開始執行任務程式的main方法了
以WordCount為例:
1 package org.apache.hadoop.examples; 2 3 import java.io.IOException; 4 import java.util.StringTokenizer; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.fs.Path; 8 import org.apache.hadoop.io.IntWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15 import org.apache.hadoop.util.GenericOptionsParser; 16 17 public class WordCount { 18 19 public static class TokenizerMapper 20 extends Mapper<Object, Text, Text, IntWritable>{ 21 22 private final static IntWritable one = new IntWritable(1); 23 private Text word = new Text(); 24 25 public void map(Object key, Text value, Context context 26 ) throws IOException, InterruptedException { 27 StringTokenizer itr = new StringTokenizer(value.toString()); 28 while (itr.hasMoreTokens()) { 29 word.set(itr.nextToken()); 30 context.write(word, one); 31 } 32 } 33 } 34 35 public static class IntSumReducer 36 extends Reducer<Text,IntWritable,Text,IntWritable> { 37 private IntWritable result = new IntWritable(); 38 39 public void reduce(Text key, Iterable<IntWritable> values, 40 Context context 41 ) throws IOException, InterruptedException { 42 int sum = 0; 43 for (IntWritable val : values) { 44 sum += val.get(); 45 } 46 result.set(sum); 47 context.write(key, result); 48 } 49 } 50 51 public static void main(String[] args) throws Exception { 52 Configuration conf = new Configuration(); 53 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 54 if (otherArgs.length != 2) { 55 System.err.println("Usage: wordcount <in> <out>"); 56 System.exit(2); 57 } 58 Job job = new Job(conf, "word count"); 59 job.setJarByClass(WordCount.class); 60 job.setMapperClass(TokenizerMapper.class); 61 job.setCombinerClass(IntSumReducer.class); 62 job.setReducerClass(IntSumReducer.class); 63 job.setOutputKeyClass(Text.class); 64 job.setOutputValueClass(IntWritable.class); 65 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 66 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 67 System.exit(job.waitForCompletion(true) ? 0 : 1); 68 } 69 }
在程式執行入口main方法中
首先定義配置檔案類 Configuration,此類是Hadoop各個模組的公共使用類,用於載入類路徑下的各種配置檔案,讀寫其中的配置選項;
第二步中用到了 GenericOptionsParser 類,其目的是將命令列中的後部分引數自動設定到變數conf中,
如果程式碼提交的時候傳入其他引數,比如指定reduce的個數,可以根據 GenericOptionsParser的命令列格式這麼寫:
bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5,
其規則是 -D 加上MR的配置選項(預設reduce task的個數為1,map的個數也為1);
之後就是 Job 的定義
使用的job類的構造方法為
public Job(Configuration conf, String jobName) throws IOException { this(conf); setJobName(jobName); }
呼叫了另外一個構造方法,並設定了Job的名字(即WordCount)
public Job(Configuration conf) throws IOException { this(new JobConf(conf)); }
public JobConf(Configuration conf) { super(conf); if (conf instanceof JobConf) { JobConf that = (JobConf)conf; credentials = that.credentials; } checkAndWarnDeprecation(); }
job 已經根據 配置資訊例項化好執行環境了,下面就是加入實體“口食”
依次給job新增Jar包、設定Mapper類、設定合併類、設定Reducer類、設定輸出鍵型別、設定輸出值型別
在setJarByClass中
public void setJarByClass(Class<?> cls) { ensureState(JobState.DEFINE); conf.setJarByClass(cls); }
它先判斷當前job的狀態是否在執行中,接著通過class找到jar檔案,將jar路徑賦值給mapreduce.jar.jar屬性(尋找jar檔案的方法使通過ClassUtil類中的findContainingJar方法)
job的提交方法是
job.waitForCompletion(true)
1 public boolean waitForCompletion(boolean verbose 2 ) throws IOException, InterruptedException, 3 ClassNotFoundException { 4 if (state == JobState.DEFINE) { 5 submit(); 6 } 7 if (verbose) { 8 monitorAndPrintJob(); 9 } else { 10 // get the completion poll interval from the client. 11 int completionPollIntervalMillis = 12 Job.getCompletionPollInterval(cluster.getConf()); 13 while (!isComplete()) { 14 try { 15 Thread.sleep(completionPollIntervalMillis); 16 } catch (InterruptedException ie) { 17 } 18 } 19 } 20 return isSuccessful(); 21 }
引數 verbose ,如果想在控制檯列印當前的任務執行進度,則設為true
1 public void submit() 2 throws IOException, InterruptedException, ClassNotFoundException { 3 ensureState(JobState.DEFINE); 4 setUseNewAPI(); 5 connect(); 6 final JobSubmitter submitter = 7 getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); 8 status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() { 9 public JobStatus run() throws IOException, InterruptedException, 10 ClassNotFoundException { 11 return submitter.submitJobInternal(Job.this, cluster); 12 } 13 }); 14 state = JobState.RUNNING; 15 LOG.info("The url to track the job: " + getTrackingURL()); 16 }
在submit 方法中會把Job提交給對應的Cluster,然後不等待Job執行結束就立刻返回
同時會把Job例項的狀態設定為JobState.RUNNING,從而來表示Job正在進行中
然後在Job執行過程中,可以呼叫getJobState()來獲取Job的執行狀態
Submit主要進行如下操作
- 檢查Job的輸入輸出是各項引數,獲取配置資訊和遠端主機的地址,生成JobID,確定所需工作目錄(也是MRAppMaster.java所在目錄),執行期間設定必要的資訊
- 拷貝所需要的Jar檔案和配置檔案資訊到HDFS系統上的指定工作目錄,以便各個節點呼叫使用
- 計算並獲數去輸入分片(Input Split)的數目,以確定map的個數
- 呼叫YARNRunner類下的submitJob()函式,提交Job,傳出相應的所需引數(例如 JobID等)。
- 等待submit()執行返回Job執行狀態,最後刪除相應的工作目錄。
在提交前先連結叢集(cluster),通過connect方法
1 private synchronized void connect() 2 throws IOException, InterruptedException, ClassNotFoundException { 3 if (cluster == null) { 4 cluster = 5 ugi.doAs(new PrivilegedExceptionAction<Cluster>() { 6 public Cluster run() 7 throws IOException, InterruptedException, 8 ClassNotFoundException { 9 return new Cluster(getConfiguration()); 10 } 11 }); 12 } 13 }
這是一個執行緒保護方法。這個方法中根據配置資訊初始化了一個Cluster物件,即代表叢集
1 public Cluster(Configuration conf) throws IOException { 2 this(null, conf); 3 } 4 5 public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 6 throws IOException { 7 this.conf = conf; 8 this.ugi = UserGroupInformation.getCurrentUser(); 9 initialize(jobTrackAddr, conf); 10 } 11 12 private void initialize(InetSocketAddress jobTrackAddr, Configuration conf) 13 throws IOException { 14 15 synchronized (frameworkLoader) { 16 for (ClientProtocolProvider provider : frameworkLoader) { 17 LOG.debug("Trying ClientProtocolProvider : " 18 + provider.getClass().getName()); 19 ClientProtocol clientProtocol = null; 20 try { 21 if (jobTrackAddr == null) { //建立YARNRunner物件 22 clientProtocol = provider.create(conf); 23 } else { 24 clientProtocol = provider.create(jobTrackAddr, conf); 25 } 26 //初始化Cluster內部成員變數 27 if (clientProtocol != null) { 28 clientProtocolProvider = provider; 29 client = clientProtocol; 30 LOG.debug("Picked " + provider.getClass().getName() 31 + " as the ClientProtocolProvider"); 32 break; 33 } 34 else { 35 LOG.debug("Cannot pick " + provider.getClass().getName() 36 + " as the ClientProtocolProvider - returned null protocol"); 37 } 38 } 39 catch (Exception e) { 40 LOG.info("Failed to use " + provider.getClass().getName() 41 + " due to error: " + e.getMessage()); 42 } 43 } 44 } 45 46 if (null == clientProtocolProvider || null == client) { 47 throw new IOException( 48 "Cannot initialize Cluster. Please check your configuration for " 49 + MRConfig.FRAMEWORK_NAME 50 + " and the correspond server addresses."); 51 } 52 }
可以看出建立客戶端代理階段使用了java.util.ServiceLoader,在2.3.0版本中包含LocalClientProtocolProvider(本地作業)和YarnClientProtocolProvider(yarn作業)(hadoop有一個Yarn引數mapreduce.framework.name用來控制你選擇的應用框架。在MRv2裡,mapreduce.framework.name有兩個值:local和yarn),此處會根據mapreduce.framework.name的配置建立相應的客戶端
(ServiceLoader是服務載入類,它根據檔案配置來在java classpath環境中載入對應介面的實現類)
這裡在實際生產中一般都是yarn,所以會建立一個YARNRunner物件(客戶端代理類)類進行任務的提交
例項化Cluster後開始真正的任務提交
submitter.submitJobInternal(Job.this, cluster)
1 JobStatus submitJobInternal(Job job, Cluster cluster) 2 throws ClassNotFoundException, InterruptedException, IOException { 3 4 5 //檢測輸出目錄合法性,是否已存在,或未設定 6 checkSpecs(job); 7 8 9 Configuration conf = job.getConfiguration(); 10 addMRFrameworkToDistributedCache(conf); 11 //獲得登入區,用以存放作業執行過程中用到的檔案,預設位置/tmp/hadoop-yarn/staging/root/.staging ,可通過yarn.app.mapreduce.am.staging-dir修改 12 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); 13 //主機名和地址設定 14 InetAddress ip = InetAddress.getLocalHost(); 15 if (ip != null) { 16 submitHostAddress = ip.getHostAddress(); 17 submitHostName = ip.getHostName(); 18 conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); 19 conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); 20 } 21 //獲取新的JobID,此處需要RPC呼叫 22 JobID jobId = submitClient.getNewJobID(); 23 job.setJobID(jobId); 24 //獲取提交目錄:/tmp/hadoop-yarn/staging/root/.staging/job_1395778831382_0002 25 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); 26 JobStatus status = null; 27 try { 28 conf.set(MRJobConfig.USER_NAME, 29 UserGroupInformation.getCurrentUser().getShortUserName()); 30 conf.set("hadoop.http.filter.initializers", 31 "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); 32 conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); 33 LOG.debug("Configuring job " + jobId + " with " + submitJobDir 34 + " as the submit dir"); 35 // get delegation token for the dir 36 TokenCache.obtainTokensForNamenodes(job.getCredentials(), 37 new Path[] { submitJobDir }, conf); 38 39 populateTokenCache(conf, job.getCredentials()); 40 41 42 // generate a secret to authenticate shuffle transfers 43 if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) { 44 KeyGenerator keyGen; 45 try { 46 keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); 47 keyGen.init(SHUFFLE_KEY_LENGTH); 48 } catch (NoSuchAlgorithmException e) { 49 throw new IOException("Error generating shuffle secret key", e); 50 } 51 SecretKey shuffleKey = keyGen.generateKey(); 52 TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), 53 job.getCredentials()); 54 } 55 //向叢集中拷貝所需檔案,下面會單獨分析(1) 56 copyAndConfigureFiles(job, submitJobDir); 57 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); 58 59 // 寫分片檔案job.split job.splitmetainfo,具體寫入過程與MR1相同,可參考以前文章 60 LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir)); 61 int maps = writeSplits(job, submitJobDir); 62 conf.setInt(MRJobConfig.NUM_MAPS, maps); 63 LOG.info("number of splits:" + maps); 64 65 66 // write "queue admins of the queue to which job is being submitted" 67 // to job file. 68 //設定佇列名 69 String queue = conf.get(MRJobConfig.QUEUE_NAME, 70 JobConf.DEFAULT_QUEUE_NAME); 71 AccessControlList acl = submitClient.getQueueAdmins(queue); 72 conf.set(toFullPropertyName(queue, 73 QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); 74 75 76 // removing jobtoken referrals before copying the jobconf to HDFS 77 // as the tasks don't need this setting, actually they may break 78 // because of it if present as the referral will point to a 79 // different job. 80 TokenCache.cleanUpTokenReferral(conf); 81 82 83 if (conf.getBoolean( 84 MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, 85 MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) { 86 // Add HDFS tracking ids 87 ArrayList<String> trackingIds = new ArrayList<String>(); 88 for (Token<? extends TokenIdentifier> t : 89 job.getCredentials().getAllTokens()) { 90 trackingIds.add(t.decodeIdentifier().getTrackingId()); 91 } 92 conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS, 93 trackingIds.toArray(new String[trackingIds.size()])); 94 } 95 96 97 // Write job file to submit dir 98 //寫入job.xml 99 writeConf(conf, submitJobFile); 100 101 // 102 // Now, actually submit the job (using the submit name) 103 //這裡才開始真正提交,見下面分析(2) 104 printTokens(jobId, job.getCredentials()); 105 status = submitClient.submitJob( 106 jobId, submitJobDir.toString(), job.getCredentials()); 107 if (status != null) { 108 return status; 109 } else { 110 throw new IOException("Could not launch job"); 111 } 112 } finally { 113 if (status == null) { 114 LOG.info("Cleaning up the staging area " + submitJobDir); 115 if (jtFs != null && submitJobDir != null) 116 jtFs.delete(submitJobDir, true); 117 118 119 } 120 } 121 }
洋洋灑灑一百餘行
(這個可謂任務提交的核心部分,前面的都是鋪墊)
status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials());
這裡就涉及到YarnClient和RresourceManager的RPC通訊了。包括獲取applicationId、進行狀態檢查、網路通訊等
這裡的submitClient其實就是 YARNRunner的實體類了;
Step13: 上面通過RPC的呼叫,最後會返回一個JobStatus物件,它的toString方法可以在JobClient端列印執行的相關日誌資訊。 (到這裡任務都給yarn了,這裡就只剩下監控(如果設定為true的話))monitorAndPrintJob();
這只是粗略的job提交,詳細的還有從在yarn上的RPC通訊、在datanode上從檔案的輸入到map的執行、經過shuffle過程、reduce的執行最後結果的寫檔案
MR任務的提交大多是任務環境的初始化過程,任務的執行則大多涉及到任務的排程
<