Mapreduce執行過程分析(基於Hadoop2.4)——(一)
轉載自:http://www.cnblogs.com/Scott007/p/3836687.html
1 概述
該瞅瞅MapReduce的內部執行原理了,以前只知道個皮毛,再不搞搞,不然怎麼死的都不曉得。下文會以2.4版本中的WordCount這個經典例子作為分析的切入點,一步步來看裡面到底是個什麼情況。
2 為什麼要使用MapReduce
Map/Reduce,是一種模式,適合解決平行計算的問題,比如TopN、貝葉斯分類等。注意,是平行計算,而非迭代計算,像涉及到層次聚類的問題就不太適合了。
從名字可以看出,這種模式有兩個步驟,Map和Reduce。Map即資料的對映,用於把一組鍵值對對映成另一組新的鍵值對,而Reduce這個東東,以Map階段的輸出結果作為輸入,對資料做化簡、合併等操作。
而MapReduce是Hadoop生態系統中基於底層HDFS的一個計算框架,它的上層又可以是Hive、Pig等資料倉庫框架,也可以是Mahout這樣的資料探勘工具。由於MapReduce依賴於HDFS,其運算過程中的資料等會儲存到HDFS上,把對資料集的計算分發給各個節點,並將結果進行彙總,再加上各種狀態彙報、心跳彙報等,其只適合做離線計算。和實時計算框架Storm、Spark等相比,速度上沒有優勢。舊的Hadoop生態幾乎是以MapReduce為核心的,但是慢慢的發展,其擴充套件性差、資源利用率低、可靠性等問題都越來越讓人覺得不爽,於是才產生了Yarn這個新的東東,並且二代版的Hadoop生態都是以Yarn為核心。Storm、Spark等都可以基於Yarn使用。
3 怎麼執行MapReduce
明白了哪些地方可以使用這個牛叉的MapReduce框架,那該怎麼用呢?Hadoop的MapReduce原始碼給我們提供了範例,在其hadoop-mapreduce-examples子工程中包含了MapReduce的Java版例子。在寫完類似的程式碼後,打包成jar,在HDFS的客戶端執行:
bin/hadoop jar mapreduce_examples.jar mainClass args
即可。當然,也可以在IDE(如Eclipse)中,進行遠端執行、除錯程式。
至於,HadoopStreaming方式,網上有很多。我們這裡只討論Java的實現。
4 如何編寫MapReduce程式
如前文所說,MapReduce中有Map和Reduce,在實現MapReduce的過程中,主要分為這兩個階段,分別以兩類函式進行展現,一個是map函式,一個是reduce函式。map函式的引數是一個<key,value>鍵值對,其輸出結果也是鍵值對,reduce函式以map的輸出作為輸入進行處理。
4.1 程式碼構成
實際的程式碼中,需要三個元素,分別是Map、Reduce、執行任務的程式碼。這裡的Map類是繼承了org.apache.hadoop.mapreduce.Mapper,並實現其中的map方法;而Reduce類是繼承了org.apache.hadoop.mapreduce.Reducer,實現其中的reduce方法。至於執行任務的程式碼,就是我們程式的入口。
下面是Hadoop提供的WordCount原始碼。
1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.examples; 19 20 import java.io.IOException; 21 import java.util.StringTokenizer; 22 23 import org.apache.hadoop.conf.Configuration; 24 import org.apache.hadoop.fs.Path; 25 import org.apache.hadoop.io.IntWritable; 26 import org.apache.hadoop.io.Text; 27 import org.apache.hadoop.mapreduce.Job; 28 import org.apache.hadoop.mapreduce.Mapper; 29 import org.apache.hadoop.mapreduce.Reducer; 30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 32 import org.apache.hadoop.util.GenericOptionsParser; 33 34 public class WordCount { 35 36 public static class TokenizerMapper 37 extends Mapper<Object, Text, Text, IntWritable>{ 38 39 private final static IntWritable one = new IntWritable(1); 40 private Text word = new Text(); 41 42 public void map(Object key, Text value, Context context 43 ) throws IOException, InterruptedException { 44 StringTokenizer itr = new StringTokenizer(value.toString()); 45 while (itr.hasMoreTokens()) { 46 word.set(itr.nextToken()); 47 context.write(word, one); 48 } 49 } 50 } 51 52 public static class IntSumReducer 53 extends Reducer<Text,IntWritable,Text,IntWritable> { 54 private IntWritable result = new IntWritable(); 55 56 public void reduce(Text key, Iterable<IntWritable> values, 57 Context context 58 ) throws IOException, InterruptedException { 59 int sum = 0; 60 for (IntWritable val : values) { 61 sum += val.get(); 62 } 63 result.set(sum); 64 context.write(key, result); 65 } 66 } 67 68 public static void main(String[] args) throws Exception { 69 Configuration conf = new Configuration(); 70 String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); 71 if (otherArgs.length != 2) { 72 System.err.println("Usage: wordcount <in> <out>"); 73 System.exit(2); 74 } 75 Job job = new Job(conf, "word count"); 76 job.setJarByClass(WordCount.class); 77 job.setMapperClass(TokenizerMapper.class); 78 job.setCombinerClass(IntSumReducer.class); 79 job.setReducerClass(IntSumReducer.class); 80 job.setOutputKeyClass(Text.class); 81 job.setOutputValueClass(IntWritable.class); 82 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 83 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 84 System.exit(job.waitForCompletion(true) ? 0 : 1); 85 } 86 }
4.2 入口類
4.2.1 引數獲取
首先定義配置檔案類Configuration,此類是Hadoop各個模組的公共使用類,用於載入類路徑下的各種配置檔案,讀寫其中的配置選項。
第二步中,用到了GenericOptionsParser類,其目的是將命令列中引數自動設定到變數conf中。
GenericOptionsParser的構造方法進去之後,會進行到parseGeneralOptions,對傳入的引數進行解析:
1 private void parseGeneralOptions(Options opts, Configuration conf, 2 3 String[] args) throws IOException { 4 5 opts = buildGeneralOptions(opts); 6 7 CommandLineParser parser = new GnuParser(); 8 9 try { 10 11 commandLine = parser.parse(opts, preProcessForWindows(args), true); 12 13 processGeneralOptions(conf, commandLine); 14 15 } catch(ParseException e) { 16 17 LOG.warn("options parsing failed: "+e.getMessage()); 18 19 20 21 HelpFormatter formatter = new HelpFormatter(); 22 23 formatter.printHelp("general options are: ", opts); 24 25 } 26 27 }
而getRemainingArgs方法會獲得傳入的引數,接著在main方法中會進行判斷引數的個數,由於此處是WordCount計算,只需要傳入檔案的輸入路徑和輸出路徑即可,因此引數的個數為2,否則將退出:
1 if (otherArgs.length != 2) { 2 3 System.err.println("Usage: wordcount <in> <out>"); 4 5 System.exit(2); 6 7 }
如果在程式碼執行的時候傳入其他的引數,比如指定reduce的個數,可以根據GenericOptionsParser的命令列格式這麼寫:
bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5
其規則是-D加MapReduce的配置選項,當然還支援-fs等其他引數傳入。當然,預設情況下Reduce的數目為1,Map的數目也為1。
4.2.2 Job定義
定義Job物件,其構造方法為:
1 public Job(Configuration conf, String jobName) throws IOException { 2 3 this(conf); 4 5 setJobName(jobName); 6 7 }
可見,傳入的"word count"就是Job的名字。而conf被傳遞給了JobConf進行環境變數的獲取:
1 public JobConf(Configuration conf) { 2 3 super(conf); 6 7 if (conf instanceof JobConf) { 8 9 JobConf that = (JobConf)conf; 10 11 credentials = that.credentials; 12 13 } 14 checkAndWarnDeprecation(); 19 }
Job已經例項化了,下面就得給這個Job加點佐料才能讓它按照我們的要求執行。於是依次給Job新增啟動Jar包、設定Mapper類、設定合併類、設定Reducer類、設定輸出鍵型別、設定輸出值的型別。
這裡有必要說下設定Jar包的這個方法setJarByClass:
1 public void setJarByClass(Class<?> cls) { 2 3 ensureState(JobState.DEFINE); 4 5 conf.setJarByClass(cls); 6 7 }
它會首先判斷當前Job的狀態是否是執行中,接著通過class找到其所屬的jar檔案,將jar路徑賦值給mapreduce.job.jar屬性。至於尋找jar檔案的方法,則是通過classloader獲取類路徑下的資原始檔,進行迴圈遍歷。具體實現見ClassUtil類中的findContainingJar方法。
搞完了上面的東西,緊接著就會給mapreduce.input.fileinputformat.inputdir引數賦值,這是Job的輸入路徑,還有mapreduce.input.fileinputformat.inputdir,這是Job的輸出路徑。具體的位置,就是我們前面main中傳入的Args。
4.2.3 Job提交
萬事俱備,那就執行吧。
這裡呼叫的方法如下:
1 public boolean waitForCompletion(boolean verbose 2 3 ) throws IOException, InterruptedException, 4 5 ClassNotFoundException { 6 7 if (state == JobState.DEFINE) { 8 9 submit(); 10 11 } 12 13 if (verbose) { 14 15 monitorAndPrintJob(); 16 17 } else { 18 19 // get the completion poll interval from the client. 20 21 int completionPollIntervalMillis = 22 23 Job.getCompletionPollInterval(cluster.getConf()); 24 25 while (!isComplete()) { 26 27 try { 28 29 Thread.sleep(completionPollIntervalMillis); 30 31 } catch (InterruptedException ie) { 32 33 } 34 35 } 36 37 } 38 39 return isSuccessful(); 40 41 }
至於方法的引數verbose,如果想在控制檯列印當前的進度,則設定為true。
至於submit方法,如果當前在HDFS的配置檔案中配置了mapreduce.framework.name屬性為“yarn”的話,會建立一個YARNRunner物件來進行任務的提交。其構造方法如下:
1 public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, 2 3 ClientCache clientCache) { 4 5 this.conf = conf; 6 7 try { 8 9 this.resMgrDelegate = resMgrDelegate; 10 11 this.clientCache = clientCache; 12 13 this.defaultFileContext = FileContext.getFileContext(this.conf); 14 15 } catch (UnsupportedFileSystemException ufe) { 16 17 throw new RuntimeException("Error in instantiating YarnClient", ufe); 18 19 } 20 21 }
其中,ResourceMgrDelegate實際上ResourceManager的代理類,其實現了YarnClient介面,通過ApplicationClientProtocol代理直接向RM提交Job,殺死Job,檢視Job執行狀態等操作。同時,在ResourceMgrDelegate類中會通過YarnConfiguration來讀取yarn-site.xml、core-site.xml等配置檔案中的配置屬性。
下面就到了客戶端最關鍵的時刻了,提交Job到叢集執行。具體實現類是JobSubmitter類中的submitJobInternal方法。這個牛氣哄哄的方法寫了100多行,還不算其幾十行的註釋。我們看它幹了點啥。
Step1:
檢查job的輸出路徑是否存在,如果存在則丟擲異常。
Step2:
初始化用於存放Job相關資源的路徑。注意此路徑的構造方式為:
1 conf.get(MRJobConfig.MR_AM_STAGING_DIR, 2 3 MRJobConfig.DEFAULT_MR_AM_STAGING_DIR) 4 5 + Path.SEPARATOR + user 6 7 + Path.SEPARATOR + STAGING_CONSTANT
其中,MRJobConfig.DEFAULT_MR_AM_STAGING_DIR為“/tmp/hadoop-yarn/staging”,STAGING_CONSTANT為".staging"。
Step3:
設定客戶端的host屬性:mapreduce.job.submithostname和mapreduce.job.submithostaddress。
Step4:
通過RPC,向Yarn的ResourceManager申請JobID物件。
Step5:
從HDFS的NameNode獲取驗證用的Token,並將其放入快取。
Step6:
將作業檔案上傳到HDFS,這裡如果我們前面沒有對Job命名的話,預設的名稱就會在這裡設定成jar的名字。並且,作業預設的副本數是10,如果屬性mapreduce.client.submit.file.replication沒有被設定的話。
Step7:
檔案上傳到HDFS之後,還要被DistributedCache進行快取起來。這是因為計算節點收到該作業的第一個任務後,就會有DistributedCache自動將作業檔案Cache到節點本地目錄下,並且會對壓縮檔案進行解壓,如:.zip,.jar,.tar等等,然後開始任務。
最後,對於同一個計算節點接下來收到的任務,DistributedCache不會重複去下載作業檔案,而是直接執行任務。如果一個作業的任務數很多,這種設計避免了在同一個節點上對用一個job的檔案會下載多次,大大提高了任務執行的效率。
Step8:
對每個輸入檔案進行split劃分。注意這只是個邏輯的劃分,不是物理的。因為此處是輸入檔案,因此執行的是FileInputFormat類中的getSplits方法。只有非壓縮的檔案和幾種特定壓縮方式壓縮後的檔案才分片。分片的大小由如下幾個引數決定:mapreduce.input.fileinputformat.split.maxsize、mapreduce.input.fileinputformat.split.minsize、檔案的塊大小。
具體計算方式為:
Math.max(minSize, Math.min(maxSize, blockSize))
分片的大小有可能比預設塊大小64M要大,當然也有可能小於它,預設情況下分片大小為當前HDFS的塊大小,64M。
接下來就該正兒八經的獲取分片詳情了。程式碼如下:
1 long bytesRemaining = length; 2 3 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { 4 5 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); 6 7 splits.add(makeSplit(path, length-bytesRemaining, splitSize, 9