MapReduce原始碼解析之Outputforamt
阿新 • • 發佈:2019-01-11
OutputFormat過程的作用就是定義資料key-value的輸出格式,給你處理好後的資料,究竟以什麼樣的形式輸出呢,才能讓下次別人拿到這個檔案的時候能準確的提取出裡面的資料。這裡,我們撇開這個話題,僅僅我知道的一些定義的資料格式的方法,比如在Redis中會有這樣的設計:
[key-length][key][value-length][value][key-length][key][value-length][value]...
或者說不一定非要省空間,直接搞過分隔符
[key] [value]\n
[key] [value]\n
[key] [value]\n
.....
這樣逐行讀取,再以空格隔開,取出裡面的鍵值對,這麼做簡單是簡單,就是不緊湊,空間浪費得有點多。在MapReduce的OutputFormat的有種格式用的就是這種方式。
首先必須得了解OutputFormat裡面到底有什麼東西:
[java] view plain copy print?- publicinterface OutputFormat<K, V> {
- /**
- * Get the {@link RecordWriter} for the given job.
-
* 獲取輸出記錄鍵值記錄
- *
- * @param ignored
- * @param job configuration for the job whose output is being written.
- * @param name the unique name for this part of the output.
- * @param progress mechanism for reporting progress while writing to file.
-
* @return a {@link RecordWriter} to write the output for the job.
- * @throws IOException
- */
- RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
- String name, Progressable progress)
- throws IOException;
- /**
- * Check for validity of the output-specification for the job.
- *
- * <p>This is to validate the output specification for the job when it is
- * a job is submitted. Typically checks that it does not already exist,
- * throwing an exception when it already exists, so that output is not
- * overwritten.</p>
- * 作業執行之前進行的檢測工作,例如配置的輸出目錄是否存在等
- *
- * @param ignored
- * @param job job configuration.
- * @throws IOException when output should not be attempted
- */
- void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
- }
- /** A base class for {@link OutputFormat}. */
- publicabstractclass FileOutputFormat<K, V> implements OutputFormat<K, V> {
- publicvoid checkOutputSpecs(FileSystem ignored, JobConf job)
- throws FileAlreadyExistsException,
- InvalidJobConfException, IOException {
- // Ensure that the output directory is set and not already there
- Path outDir = getOutputPath(job);
- if (outDir == null && job.getNumReduceTasks() != 0) {
- thrownew InvalidJobConfException("Output directory not set in JobConf.");
- }
- if (outDir != null) {
- FileSystem fs = outDir.getFileSystem(job);
- // normalize the output directory
- outDir = fs.makeQualified(outDir);
- setOutputPath(job, outDir);
- // get delegation token for the outDir's file system
- TokenCache.obtainTokensForNamenodes(job.getCredentials(),
- new Path[] {outDir}, job);
- // check its existence
- if (fs.exists(outDir)) {
- //如果輸出目錄以及存在,則拋異常
- thrownew FileAlreadyExistsException("Output directory " + outDir +
- " already exists");
- }
- }
- }
- publicstatic Path getTaskOutputPath(JobConf conf, String name)
- throws IOException {
- // ${mapred.out.dir}
- Path outputPath = getOutputPath(conf);
- if (outputPath == null) {
- thrownew IOException("Undefined job output-path");
- }
- //根據OutputCommitter獲取輸出路徑
- OutputCommitter committer = conf.getOutputCommitter();
- Path workPath = outputPath;
- TaskAttemptContext context = new TaskAttemptContext(conf,
- TaskAttemptID.forName(conf.get("mapred.task.id")));
- if (committer instanceof FileOutputCommitter) {
- workPath = ((FileOutputCommitter)committer).getWorkPath(context,
- outputPath);
- }
- // ${mapred.out.dir}/_temporary/_${taskid}/${name}
- returnnew Path(workPath, name);
- }
- publicclass FileOutputCommitter extends OutputCommitter {
- publicstaticfinal Log LOG = LogFactory.getLog(
- "org.apache.hadoop.mapred.FileOutputCommitter");
- /**
- * Temporary directory name
- */
- publicstaticfinal String TEMP_DIR_NAME = "_temporary";
- publicstaticfinal String SUCCEEDED_FILE_NAME = "_SUCCESS";
- staticfinal String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
- "mapreduce.fileoutputcommitter.marksuccessfuljobs";
- publicvoid setupJob(JobContext context) throws IOException {