1. 程式人生 > >MapReduce原始碼解析之Outputforamt

MapReduce原始碼解析之Outputforamt

     OutputFormat過程的作用就是定義資料key-value的輸出格式,給你處理好後的資料,究竟以什麼樣的形式輸出呢,才能讓下次別人拿到這個檔案的時候能準確的提取出裡面的資料。這裡,我們撇開這個話題,僅僅我知道的一些定義的資料格式的方法,比如在Redis中會有這樣的設計:

[key-length][key][value-length][value][key-length][key][value-length][value]...

或者說不一定非要省空間,直接搞過分隔符

[key]   [value]\n

[key]   [value]\n

[key]   [value]\n

.....

這樣逐行讀取,再以空格隔開,取出裡面的鍵值對,這麼做簡單是簡單,就是不緊湊,空間浪費得有點多。在MapReduce的OutputFormat的有種格式用的就是這種方式。

        首先必須得了解OutputFormat裡面到底有什麼東西:

[java] view plain copy  print?
  1. publicinterface OutputFormat<K, V> {  
  2.   /**  
  3.    * Get the {@link RecordWriter} for the given job. 
  4.    * 獲取輸出記錄鍵值記錄
     
  5.    * 
  6.    * @param ignored 
  7.    * @param job configuration for the job whose output is being written. 
  8.    * @param name the unique name for this part of the output. 
  9.    * @param progress mechanism for reporting progress while writing to file. 
  10.    * @return a {@link RecordWriter} to write the output for the job.
     
  11.    * @throws IOException 
  12.    */
  13.   RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,  
  14.                                      String name, Progressable progress)  
  15.   throws IOException;  
  16.   /**  
  17.    * Check for validity of the output-specification for the job. 
  18.    *   
  19.    * <p>This is to validate the output specification for the job when it is 
  20.    * a job is submitted.  Typically checks that it does not already exist, 
  21.    * throwing an exception when it already exists, so that output is not 
  22.    * overwritten.</p> 
  23.    * 作業執行之前進行的檢測工作,例如配置的輸出目錄是否存在等 
  24.    * 
  25.    * @param ignored 
  26.    * @param job job configuration. 
  27.    * @throws IOException when output should not be attempted 
  28.    */
  29.   void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;  
  30. }  
很簡單的2個方法,RecordWriter比較重要,後面的key-value的寫入操作都是根據他來完成的。但是他是一個介面,在MapReduce中,我們用的最多的他的子類是FileOutputFormat: [java] view plain copy  print?
  1. /** A base class for {@link OutputFormat}. */
  2. publicabstractclass FileOutputFormat<K, V> implements OutputFormat<K, V> {  
他是一個抽象類,但是實現了介面中的第二個方法checkOutputSpecs()方法: [java] view plain copy  print?
  1. publicvoid checkOutputSpecs(FileSystem ignored, JobConf job)   
  2.     throws FileAlreadyExistsException,   
  3.            InvalidJobConfException, IOException {  
  4.     // Ensure that the output directory is set and not already there
  5.     Path outDir = getOutputPath(job);  
  6.     if (outDir == null && job.getNumReduceTasks() != 0) {  
  7.       thrownew InvalidJobConfException("Output directory not set in JobConf.");  
  8.     }  
  9.     if (outDir != null) {  
  10.       FileSystem fs = outDir.getFileSystem(job);  
  11.       // normalize the output directory
  12.       outDir = fs.makeQualified(outDir);  
  13.       setOutputPath(job, outDir);  
  14.       // get delegation token for the outDir's file system
  15.       TokenCache.obtainTokensForNamenodes(job.getCredentials(),   
  16.                                           new Path[] {outDir}, job);  
  17.       // check its existence
  18.       if (fs.exists(outDir)) {  
  19.         //如果輸出目錄以及存在,則拋異常
  20.         thrownew FileAlreadyExistsException("Output directory " + outDir +   
  21.                                              " already exists");  
  22.       }  
  23.     }  
  24.   }  
就是檢查輸出目錄在不在的操作。在這個類裡還出現了一個輔助類: [java] view plain copy  print?
  1. publicstatic Path getTaskOutputPath(JobConf conf, String name)   
  2.   throws IOException {  
  3.     // ${mapred.out.dir}
  4.     Path outputPath = getOutputPath(conf);  
  5.     if (outputPath == null) {  
  6.       thrownew IOException("Undefined job output-path");  
  7.     }  
  8.     //根據OutputCommitter獲取輸出路徑
  9.     OutputCommitter committer = conf.getOutputCommitter();  
  10.     Path workPath = outputPath;  
  11.     TaskAttemptContext context = new TaskAttemptContext(conf,  
  12.                 TaskAttemptID.forName(conf.get("mapred.task.id")));  
  13.     if (committer instanceof FileOutputCommitter) {  
  14.       workPath = ((FileOutputCommitter)committer).getWorkPath(context,  
  15.                                                               outputPath);  
  16.     }  
  17.     // ${mapred.out.dir}/_temporary/_${taskid}/${name}
  18.     returnnew Path(workPath, name);  
  19.   }  
就是上面OutputCommiter,裡面定義了很多和Task,job作業相關的方法。很多時候都會與OutputFormat合作的形式出現。他也有自己的子類實現FileOutputCommiter: [java] view plain copy  print?
  1. publicclass FileOutputCommitter extends OutputCommitter {  
  2.   publicstaticfinal Log LOG = LogFactory.getLog(  
  3.       "org.apache.hadoop.mapred.FileOutputCommitter");  
  4. /** 
  5.    * Temporary directory name  
  6.    */
  7.   publicstaticfinal String TEMP_DIR_NAME = "_temporary";  
  8.   publicstaticfinal String SUCCEEDED_FILE_NAME = "_SUCCESS";  
  9.   staticfinal String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =  
  10.     "mapreduce.fileoutputcommitter.marksuccessfuljobs";  
  11.   publicvoid setupJob(JobContext context) throws IOException {  
  12.