Hadoop OutputFormat淺析
問題:reduce輸出時,如果不是推測任務寫結果時會先寫臨時目錄最後移動到輸出目錄嗎?
下面部分轉自Hadoop官網說明
OutputFormat 描述Map/Reduce作業的輸出樣式。
Map/Reduce框架根據作業的OutputFormat來:
- 檢驗作業的輸出,例如檢查輸出路徑是否已經存在。
- 提供一個RecordWriter的實現,用來輸出作業結果。 輸出文件保存在FileSystem上。
TextOutputFormat是默認的 OutputFormat。
任務的Side-Effect File
在一些應用程序中,子任務需要產生一些side-file,這些文件與作業實際輸出結果的文件不同。
在這種情況下,同一個Mapper或者Reducer的兩個實例(比如預防性任務)同時打開或者寫 FileSystem上的同一文件就會產生沖突。因此應用程序在寫文件的時候需要為每次任務嘗試(不僅僅是每次任務,每個任務可以嘗試執行很多次)選取一個獨一無二的文件名(使用attemptid,例如task_200709221812_0001_m_000000_0)。
為了避免沖突,Map/Reduce框架為每次嘗試執行任務都建立和維護一個特殊的 ${mapred.output.dir}/_temporary/_${taskid}子目錄,這個目錄位於本次嘗試執行任務輸出結果所在的FileSystem上,可以通過${mapred.work.output.dir}來訪問這個子目錄。 對於成功完成的任務嘗試,只有${mapred.output.dir}/_temporary/_${taskid}下的文件會移動
在任務執行期間,應用程序在寫文件時可以利用這個特性,比如 通過 FileOutputFormat.getWorkOutputPath()獲得${mapred.work.output.dir}目錄, 並在其下創建任意任務執行時所需的side-file,框架在任務嘗試成功時會馬上移動這些文件,因此不需要在程序內為每次任務嘗試選取一個獨一無二的名字。
註意:在每次任務嘗試執行期間,${mapred.work.output.dir} 的值實際上是 ${mapred.output.dir}/_temporary/_{$taskid},這個值是Map/Reduce框架創建的。 所以使用這個特性的方法是,在FileOutputFormat.getWorkOutputPath() 路徑下創建side-file即可。
對於只使用map不使用reduce的作業,這個結論也成立。這種情況下,map的輸出結果直接生成到HDFS上。
RecordWriter
RecordWriter 生成<key, value> 對到輸出文件。
RecordWriter的實現把作業的輸出結果寫到 FileSystem。
下面部分轉自http://www.cnblogs.com/noures/archive/2012/07/13/2589767.html
在 Hadoop中,OutputFormat和InputFormat是相對應的兩個東西。相比於InputFormat,OutputFormat似乎沒 有那麽多細節。InputFormat涉及到對輸入數據的解析和劃分,繼而影響到Map任務的數目,以及Map任務的調度(見《Hadoop InputFormat淺析》)。而OutputFormat似乎像其字面意思那樣,僅僅是完成對輸出數據的格式化。
對於輸出數據的格式化,這個應該沒什麽值得多說的。根據需要,OutputFormat愛把輸出寫成什麽格式就寫成什麽格式、愛把輸出寫到數據庫就寫到數據庫、愛把輸出通過網絡發給其他服務就發給其他服務...
不過,OutputFormat所做的事情其實並不限於此。OutputFormat類包含如下三個方法:RecordWriter getRecordWriter(TaskAttemptContext context); void checkOutputSpecs(JobContext context); OutputCommitter getOutputCommitter(TaskAttemptContext context);其中: checkOutputSpecs是 在JobClient提交Job之前被調用的(在使用InputFomat進行輸入數據劃分之前),用於檢測Job的輸出路徑。比 如,FileOutputFormat通過這個方法來確認在Job開始之前,Job的Output路徑並不存在,然後該方法又會重新創建這個Output 路徑。這樣一來,就能確保Job結束後,Output路徑下的東西就是且僅是該Job輸出的。
getRecordWriter用於返回一個RecordWriter的實例,Reduce任務在執行的時候就是利用這個實例來輸出Key/Value的。(如果Job不需要Reduce,那麽Map任務會直接使用這個實例來進行輸出。)
RecordWriter有如下兩個方法:
void write(K key, V value); void close(TaskAttemptContext context);前者負責將Reduce輸出的Key/Value寫成特定的格式,後者負責對輸出做最後的確認並關閉輸出。 前面提到的OutputFormat的字面含義,其實就是由這個RecordWriter來實現的。 而第三個方法,getOutputCommitter則 用於返回一個OutputCommitter的實例。(在Hadoop-0.20中,MapReduce有兩套API。 getOutputCommitter是在NewAPI中才提供的,OldAPI裏面並沒有。不過OldAPI同樣有OutputCommtter這個東 西,只是不能通過OutputFormat來定制而已。) OutputCommitter用於控制Job的輸出環境,它有下面幾個方法:
void setupJob(JobContext jobContext); void commitJob(JobContext jobContext); void abortJob(JobContext jobContext, JobStatus.State state); void setupTask(TaskAttemptContext taskContext); boolean needsTaskCommit(TaskAttemptContext taskContext); void commitTask(TaskAttemptContext taskContext); void abortTask(TaskAttemptContext taskContext);
Job開始被執行之前,框架會調用OutputCommitter.setupJob()為Job創建一個輸出路徑;
如果Job成功完成,框架會調用OutputCommitter.commitJob()提交Job的輸出;
如果Job失敗,框架會調用OutputCommitter.abortJob()撤銷Job的輸出;
對 應於Job下的每一個Task,同樣牽涉創建、提交和撤銷三個動作,分別由OutputCommitter.setupTask()、 OutputCommitter.commitTask()、OutputCommitter.abortTask()來完成。而一個Task可能沒有輸 出,從而也就不需要提交,這個可以通過OutputCommitter.needsTaskCommit()來判斷;
具體OutputCommitter的這些方法裏面完成了什麽樣的操作,這是由具體的OutputCommitter來定制的,可以任意去實現。比如,FileOutputCommitter完成了如下操作:
setupJob - mkdir ${mapred.output.dir}/_temporary
commitJob - touch ${mapred.output.dir}/_SUCCESS && rm -r ${mapred.output.dir}/_temporary
abortJob - rm -r ${mapred.output.dir}/_temporary
setupTask - <nothing>
needsTaskCommit - test -d ${mapred.output.dir}/_temporary/_${TaskAttemptID}
commitTask - mv ${mapred.output.dir}/_temporary/_${TaskAttemptID}/* ${mapred.output.dir}/
abortTask - rm -r ${mapred.output.dir}/_temporary/_${TaskAttemptID}
(註意,上面這些路徑都是HDFS上的,不是某個TaskTracker本地機器上的。)
其 中的邏輯是:Job執行的時候,Task的輸出放到Output路徑下的_temporary目錄的以TaskAttemptID命名的子目錄中。只有當 Task成功了,相應的輸出才會被提交到Output路徑下。而只有當整個Job都成功了,才會在Output路徑下放置_SUCCESS文件。 _SUCCESS文件的存在表明了Output路徑下的輸出信息是正確且完整的;而如果_SUCCESS文件不存在,Output下的信息也依然是正確的 (這已經由commitTask保證了),但是不一定是完整的(可能只包含部分Reduce的輸出)。
與之對應,FileOutputFormat會讓它所創建的RecordWriter將輸出寫到${mapred.output.dir}/_temporary/_${TaskAttemptID}/下。當然,Map和Reduce任務也可以自己向這個路徑put數據。接下來就是到在哪裏去執行這些方法的問題了。
一 個Job被提交到JobTracker後會生成若幹的Map和Reduce任務,這些任務會被分派到TaskTracker上。對於每一個 Task,TaskTracker會使用一個子JVM來執行它們。那麽對於Task的setup/commit/abort這些操作,自然應該在執行 Task的子JVM裏面去完成:
當一個Task被關聯到一個子JVM後,在任務初始化階段,OutputCommitter.setupTask()會被調用;
當 一個任務執行成功完成了之後,脫離子JVM之前,OutputCommitter.commitTask()會被調用。不過這裏還有兩個細節:1、需要先 調用OutputCommitter.needsTaskCommit()來確定是否有輸出需要提交;2、提交之前還有一個同步邏輯,需要由 JobTracker同意提交後才能提交。因為Hadoop有推測執行的邏輯,一個Task可能在多個TaskTracker上同時執行,但是它們之中最 多只有一個能得到提交,否則可能導致結果的錯亂;
當 一個任務執行失敗時,OutputCommitter.abortTask()會被調用。這個調用很特殊,它不大可能在執行任務的子JVM裏面完成。因為 執行任務的子JVM裏面跑的是用戶提供的Map/Reduce代碼,Hadoop框架是無法保證這些代碼的穩定性的,所以任務的失敗往往伴隨著子JVM的 異常退出(這也就是為什麽要用子JVM來執行Map和Reduce任務的原因,否則異常退出的可能就是整個框架了)。於是,對於失敗的任 務,JobTracker除了要考慮它的重試之外,還要為其生成一個cleanup任務。這個cleanup任務像普通的Map和Reduce任務一樣, 會被分派到TaskTracker上去執行(不一定分派到之前執行該任務失敗的那個TaskTracker上,因為輸出是在HDFS上,是全局的)。而它 的執行邏輯主要就是調用OutputCommitter.abortTask();
而對於Job的setup/commit/abort,則顯然不能使用上面的邏輯。
從 時間上說,OutputCommitter.setupJob()應該在所有Map和Reduce任務執行之前被調用、 OutputCommitter.commitJob()應該在所有Map和Reduce任務執行之後被調用、而 OutputCommitter.abortJob()應該在Job確認失敗之後被調用;
從地點上說,可能調用這些方法的地方無外乎JobClient、JobTracker、或TaskTracker;
JobClient 應該第一個被排除,因為Job的執行並不依賴於JobClient。JobClient在提交完Job之後就可以退出了,它的退出並不會影響Job的繼續 執行(如果不退出則可以接收JobTracker的進度反饋)。所以,不可能依靠JobClient在Job成功以後來調用 OutputCommitter.commitJob();
JobTracker 呢?貌似是個合適的地方,因為JobTracker明確知道Job的開始與結束、成功與失敗。但是實際上還是不能由JobTracker來調用這些方法。 就像前面說到的OutputCommitter.abortTask()一樣,既然JobTracker知道了Task的失敗,卻不直接為它清理輸出,而 是通過生成一個對應的cleanup任務來完成清理工作。為什麽要這樣做呢?其實原因很簡單,因為OutputCommitter是獨立於Hadoop框 架,可以由用戶自己定制的。Hadoop框架不能保證用戶定制代碼的穩定性,當然不能讓它直接在JobTracker上執行。必須啟動一個新的JVM來執 行這些方法,那麽正好TaskTracker上已經有這樣的邏輯了。
所 以,對於Job的setup/commit/abort,跟OutputCommitter.abortTask()類似,JobTracker會生成對 應的setup任務和cleanup任務。在初始化Job的時期將Job的setup任務分派給TaskTracker,TaskTracker執行這個 setup任務所要做的事情就是調用OutputCommitter.setupJob();在Job結束時,Job的cleanup任務將分派給 TaskTracker,TaskTracker執行這個cleanup任務所要做的事情就是根據Job的執行結果是成功或是失敗,來調用 OutputCommitter.commitJob()或OutputCommitter.abortJob()。
為 了保證OutputCommitter.setupJob()在所有Map和Reduce任務執行之前被調用,在JobTracker上,Job的初始化 被分成了兩個步驟:一是為Job生成一堆任務,二是將setup任務分派給TaskTracker去執行,並等待它執行完成。在這之後,初始化才算完 成,Map和Reduce任務才能得到分派。
可見,在Job執行的過程中,除了我們關註的Map和Reduce任務之外,還會有一些隱藏的setup和cleanup任務。不過這些任務都有一個共同點,它們都可以是用戶定制的。
Hadoop OutputFormat淺析