1. 程式人生 > >Hadoop作業提交之客戶端作業提交

Hadoop作業提交之客戶端作業提交

一、概要描述
僅僅描述向Hadoop提交作業的第一步,即呼叫Jobclient的submitJob方法,向Hadoop提交作業。

二、 流程描述
Jobclient使用內建的JobSubmissionProtocol 例項jobSubmitClient 和JobTracker互動,最主要是提交作業、獲取作業執行資訊等。

在JobClient中作業提交的主要過程如下:

1)通過呼叫JobTracker的getNewJobId()向jobtracker請求一個新的作業ID
2)獲取job的jar、輸入分片、作業描述等幾個路徑資訊,以jobId命名。
3)其中getSystemDir()是返回jobtracker的系統目錄,來放置job相關的檔案。包括:mapreduce的jar檔案submitJarFile、分片檔案submitSplitFile、作業描述檔案submitJobFile
4)檢查作業的輸出說明,如果沒有指定輸出目錄或輸出目錄以及存在,則作業不提交。參照org.apache.hadoop.mapreduce.lib.output.

FileOutputFormatcheckOutputSpecs方法。如果沒有指定,則丟擲InvalidJobConfException,檔案已經存在則丟擲FileAlreadyExistsException
5)計算作業的輸入分片。通過InputFormat的getSplits(job)方法獲得作業的split並將split序列化封裝為RawSplit。返回split數目,也即代表有多個分片有多少個map。詳細參見InputFormat獲取Split的方法。
6)writeNewSplits 方法把輸入分片寫到JobTracker的job目錄下。
7)將執行作業所需的資源(包括作業jar檔案,配置檔案和計算所得的輸入分片)複製到jobtracker的檔案系統中一個以作業ID命名的目錄下。
8)使用控制代碼JobSubmissionProtocol通過RPC遠端呼叫的submitJob()方法,向JobTracker提交作業。JobTracker作業放入到記憶體佇列中,由作業排程器進行排程。並初始化作業例項。JobTracker建立job成功後會給JobClient傳回一個JobStatus物件用於記錄job的狀態資訊,如執行時間、Map和Reduce任務完成的比例等。JobClient會根據這個JobStatus物件建立一個 NetworkedJob的RunningJob物件,用於定時從JobTracker獲得執行過程的統計資料來監控並列印到使用者的控制檯。

mapreduce 作業提交和執行

mapreduce 作業提交和執行

引用下Hadoop: The Definitive Guide, Second Edition中的一張經典圖。這裡僅僅描述上圖中的左上角第一個框部分內容,即本步驟的最終輸出僅僅是將作業提交到JobTracker。其他後續文章會繼續描述。

三、程式碼詳細

Jobclient:JobClient是向JobTracker提交作業的介面,可以理解為Hadoop的Mapreduce作業框架向用戶開放的作業提交入口。可以提交作業,監視作業狀態等

JobSubmissionProtocol(為什麼0.20.1的javadoc中找不到這個介面,雖然0.20.1 0.20.2程式碼中都是相同的用法,知道2.2.0貌似重新命名為被ClientProtocol替換):JobClient和JobTracker進行通訊的一個協議。JobClient實際上是用這個控制代碼來提交鎖業並且監視作業的執行狀況。

這個介面有兩個實現:LocalJobRunner(conf)當mapred-site.xml中的mapred.job.tracker值為local是為此物件。表示在單機上執行;如果為一個地址的話則是JobTracker的物件,表示分散式執行。

詳細可參照JobClient中 的初始化程式碼:

123456789101112131415161718192021/**   *如果是非local的就會 連線到指定的JobTracker     */publicvoidinit(JobConf conf)throwsIOException{Stringtracker=conf.get("mapred.job.tracker","local");if("local".equals(tracker)){this.jobSubmitClient=newLocalJobRunner(conf);}else{this.jobSubmitClient=createRPCProxy(JobTracker.getAddress(conf),conf);}}/*  * RPC不是本次主題重點,可參照後續發表的專題內容  */privateJobSubmissionProtocol createRPCProxy(InetSocketAddress addr,Configuration conf)throwsIOException{return(JobSubmissionProtocol)RPC.getProxy(JobSubmissionProtocol.class,JobSubmissionProtocol.versionID,addr,getUGI(conf),conf,NetUtils.getSocketFactory(conf,JobSubmissionProtocol.class));}
InputFormat重要,但暫不展開(此處會有連結)

Split重要,但暫不展開(此處會有連結)
RowSplit要,但暫不展開(此處會有連結)
通過程式碼來了解流程,瞭解如何呼叫JobClient向Hadoop叢集提交作業。