1. 程式人生 > >HDFS和MapReduce 架構分析----阿冬專欄

HDFS和MapReduce 架構分析----阿冬專欄

  圖2示出的是namenode和datanode。

namenode和datanode 

圖2 namenode和datanode

    檔案寫入時的步驟為:

    a)Client向namenode發起檔案寫入的請求。

    b)namenode根據檔案大小和檔案塊配置情況,將它管理的datanode節點的資訊返回給Client。

    c)Client將檔案劃分為多個塊,根據datanode的地址資訊,按順序寫入到每一個datanode塊中。

    檔案讀取時的步驟為:

    a)Client向namenode發起檔案讀取的請求。

    b)namenode返回儲存檔案的datanode的資訊。

    c)Client讀取檔案資訊。

    作為檔案系統的管理員,沒有namenode,檔案系統將無法使用。如果執行namenode服務的機器毀壞,檔案系統上所有的檔案將會丟失,且不知道如何根據datanode的資料塊來重建檔案。Hadoop為此提供了2種機制對namenode實現冗餘備份。

    圖3示出的是冗餘namenode。

冗餘namenode 

圖3 冗餘namenode

    一種機制是備份儲存檔案系統元資料的檔案。一般配置是:將檔案系統元資料寫入本地磁碟的同時,寫入一個遠端掛載的網路檔案系統(NFS)。

    另一種機制是執行一個輔助的namenode,但它不能被用作namenode。輔助的namenode通過編輯日誌定期合併名稱空間映象。輔助namenode一般在另一臺單獨的物理計算機上執行,因為它需要佔用大量CPU時間和與namenode相同容量的記憶體來執行合併操作。它會儲存合併後的名稱空間映象的副本,並在namenode發生故障時啟用。

    但是,輔助namenode儲存的狀態總是滯後於主節點,一般情況融合2種機制。主namenode故障時,把儲存在NFS上的namenode元資料複製到輔助namenode上,並將其作為新的主namenode執行。

    2.3 命令列介面

    HDFS 的檔案和目錄有與POSIX 相似的許可權模式,通常是三類許可權模式(rwx)。叢集管理員可以通過命令列介面與HDFS互動,執行所有常見的檔案系統操作,如建立目錄、移動檔案、刪除資料、列出目錄等等。HDFS並不是一個Unix檔案系統,不支援像ls和cp這種標準的Unix檔案命令。Hadoop提供了一套與Linux檔案命令類似的命令列工具,通過shell命令操作檔案和目錄。

    Hadoop也提供操作HDFS檔案和目錄的Java庫,用於以程式設計方式訪問HDFS。

    一般情況下,由MapReduce框架讀取HDFS檔案和處理資料單元。除非需要定製資料的匯入和匯出,否則幾乎不必程式設計來讀寫HDFS檔案。

3 Hadoop MapReduce淺析

    最簡單的MapReduce 應用程式至少包含3 個部分:一個Map 函式、一個Reduce 函式和一個main 函式。main 函式將作業控制和檔案輸入/輸出結合起來。在這點上,Hadoop提供了大量的介面和抽象類,從而為Hadoop應用程式開發人員提供許多工具,可用於除錯和效能度量等。

    MapReduce本身就是用於並行處理大資料的軟體框架。MapReduce的根源是函式性程式設計中的Map函式和Reduce 函式。它由2 個可能包含許多事例(許多Map和Reduce)的操作組成。Map函式接受一組資料並將其轉換為一個鍵/值對列表,輸入域中的每個元素對應一個鍵/值對。Reduce函式接受Map函式生成的列表,然後根據它們的鍵(為每個鍵生成一個鍵/值對)縮小鍵/值對列表。可以在每個域上執行Map函式和Reduce函式,然後將輸出的鍵/值對列表輸入到另一個Reduce函式,就可得到與前面一樣的結果。換句話說,可以在輸入域並行使用相同的操作,得到的結果是一樣的,但速度更快。MapReduce的並行功能可在任意數量的系統上使用。

    圖4示出的是MapReduce思想。

    3.1 JobTracker和TaskTracker

    Hadoop MapReduce 引擎由JobTracker 和Task?Tracker組成。圖5示出的是Hadoop的結構。

    JobTracker負責管理排程所有作業,它是整個系統分配任務的核心。與HDFS的namenode類似,Job?Tracker也是唯一的。它是Hadoop叢集中唯一負責控制MapReduce應用程式的系統,在應用程式提交之後,將提供包含在HDFS中的輸入和輸出目錄,JobTracker使用檔案塊資訊(物理量和位置)確定如何建立其他TaskTracker從屬任務,MapReduce應用程式被複制到每個出現檔案塊的節點,為特定節點上的每個檔案塊建立一個唯一的從屬任務。

MapReduce思想 

圖4 MapReduce思想

Hadoop的結構 

圖5 Hadoop的結構

 TaskTracker具體負責執行使用者定義的操作,每個任務被分割為任務集,包含Map任務和Reduce任務。任務是具體執行的基本單元,TaskTracker執行過程中需要向JobTracker傳送心跳資訊,彙報每個任務的執行狀態,幫助JobTracker收集作業執行的整體情況,為下次任務的分配提供依據。

    在Hadoop中,Client(任務的提交者)是一組API,使用者需要自定義自己需要的內容,由Client將作業及其配置提交到JobTracker,並監控執行狀況。

    與HDFS的通訊機制相同,Hadoop MapReduce也使用協議介面來實現伺服器間的通訊。Client與Task?Tracker及TaskTracker之間沒有直接通訊。由於叢集各主機的通訊比較複雜,點對點直接通訊難以維持狀態資訊,所以由JobTracker收集整理統一轉發。

    3.2 MapReduce的工作機制

    JobClient.runJob(conf)這一行簡短的程式碼後面隱藏著大量的處理細節。整個過程如圖6所示,包含如下4個獨立的實體。

 執行MapReduce作業的工作原理 

圖6 執行MapReduce作業的工作原理

    a)客戶端:提交MapReduce作業。

    b)JobTracker:協調作業的執行。

    c)TaskTracker:執行作業劃分後的任務。

    d)分散式檔案系統(一般為HDFS):用來在其他實體間共享作業檔案。

    3.2.1 作業的提交

    JobClient的runJob()方法是用於新建JobClient例項並呼叫其submitJob()方法。提交作業後,runJob()每秒檢測作業的進度,如果發現上次報告後有變化,便把進度報告給控制檯。作業完成後,如果成功,就顯示作業計數器。如果失敗,導致作業失敗的錯誤被記錄到控制檯。

    JobClient的runJob()方法(圖6步驟①)實現過程如下:

    a)通過JobTracker的getNewJobId()方法,向Job?Tracker請求一個新的作業ID(圖6步驟②)。

    b)檢查作業的輸出說明。例如,如果沒有指定輸出目錄或輸出目錄已經存在,作業就不提交,錯誤返回給MapReduce程式。

    c)將執行作業所需要的資源(包括作業JAR 檔案、配置檔案和輸入分片)複製到JobTracker檔案系統中的一個以作業ID命名的目錄下(圖6步驟③)。作業JAR 的副本較多(由mapred.submit.replication 屬性控制,預設值為10), 因此在執行作業的任務時,叢集中有很多個副本可供TaskTracker訪問。

    d)通過呼叫JobTracker 的submitJob()方法告知JobTracker準備執行作業(圖6步驟④)。

    e)計算作業的輸入分片。如果分片無法計算,例如,因為輸入路徑不存在,作業就不提交,錯誤返回給MapReduce程式(圖6步驟⑥)。

3.2.2 作業的初始化

    當JobTracker接收到對其submitJob()方法的呼叫後,會把此呼叫放入一個內部佇列中,交由作業排程器(job scheduler)進行排程,並對其進行初始化。初始化包括建立一個表示正在執行作業的物件——封裝任務和記錄資訊,以便跟蹤任務的狀態和程序(圖6步驟⑤)。

    為了建立任務執行列表,作業排程器首先從共享檔案系統中獲取JobClient已計算好的輸入分片資訊(圖6步驟⑥)。然後為每個分片建立一個map任務。建立reduce 任務的數量由JobConf 的mapred.reduce.task屬性決定,它是用setNumReduceTasks()方法來設定的,然後排程器建立相應數量的要執行的reduce任務。任務在此時被指定ID。

    3.2.3 任務的分配

    TaskTracker定期向JobTracker傳送心跳。心跳告知JobTracker,TaskTracker是否還存活,同時也充當兩者之間的訊息通道。作為心跳的一部分,TaskTracker會指明它是否已經準備好執行新的任務。如果是,JobTracker會為它分配一個任務,並使用心跳的返回值與TaskTracker 進行通訊(圖6步驟⑦)。

    在JobTracker 為TaskTracker 選擇任務之前,JobTracker必須先選定任務所在的作業。一旦選擇好作業,JobTracker就可以為該作業選定一個任務。

    對於map任務和reduce任務,TaskTracker有固定數量的任務槽。例如,1個TaskTracker可能同時執行2個map 任務和2 個reduce 任務。準確數量由TaskTracker核心的數量和記憶體大小來決定。作業排程器在處理reduce任務槽之前,會填滿空閒的map任務槽,因此如果TaskTracker至少有一個空閒的map任務槽,JobTracker會先為它選擇一個map任務。

    為了選擇一個reduce任務,JobTracker簡單地從待執行的reduce任務列表中選取下一個來執行,用不著考慮資料的本地化。然而,對於一個map任務,JobTracker會考慮TaskTracker的網路位置,並選取一個距離其輸入分片檔案最近的TaskTracker。

    在最理想的情況下,任務是資料本地化的(data-local), 也就是任務執行在輸入分片所在的節點上。同樣,任務也可能是機架本地化的(rack-local)。任務和輸入分片在同一個機架,但不在同一節點上。一些任務既不是資料本地化的,也不是機架本地化的,而是操作另一個機架上的資料。

    3.2.4 任務的執行

    現在,TaskTracker已經被分配了一個任務,下一步是執行該任務。第一步,通過從共享檔案系統把作業的JAR檔案複製到TaskTracker所在的檔案系統,從而實現作業的JAR檔案本地化。同時,TaskTracker將應用程式所需要的全部檔案從共享檔案系統複製到本地磁碟(圖6步驟⑧)。第二步,TaskTracker為任務新建一個本地工作目錄,並把JAR檔案中的內容解壓到這個資料夾下。第三步,TaskTracker新建一個TaskRunner例項來執行該任務。

    TaskRunner啟動一個新的JVM(圖6步驟⑨)來執行每個任務(圖6步驟⑩), 以便使用者定義的map和re?duce 函式的任何軟體問題都不會影響到TaskTracker(例如導致其崩坡或掛起等)。任務的子程序每隔幾秒便告知父程序它的進度,直到任務完成。

    3.2.5 進度和狀態的更新

    MapReduce作業是長時間執行的批量作業,這是一個很長的時間段,對於使用者而言,能夠得知作業進展是很重要的。一個作業和它的每個任務都有一個狀態(status), 包括作業或任務的狀態(如執行狀態、成功完成、失敗狀態)、map和reduce的進度、作業計數器的值、狀態資訊或描述(可以由使用者程式碼來設定)。

    任務在執行時,對其進度保持追蹤。對map任務,任務進度是已處理輸入所佔的比例。對reduce任務,情況稍微複雜,但系統仍然會估計已處理reduce輸入的比例。比如,如果reduce任務已經執行reducer一半的輸入,那麼任務的進度便是5/6。因為已經完成複製和排序階段(各1/3),並且已經完成reduce階段的一半(1/6)。

    如果任務報告了進度,便會設定一個標誌以表明狀態變化將被髮送到TaskTracker。有一個獨立的執行緒每隔3 s檢查一次此標誌,如果已設定,則告知Task?Tracker當前任務狀態。同時,TaskTracker每隔5 s傳送心跳到JobTracker(5 s這個間隔是最小值,心跳間隔實際上由叢集的大小來決定,更大的叢集,間隔會更長一些),並且將TaskTracker執行的所有任務的狀態傳送至JobTracker。

    JobTracker將這些更新狀態合併起來,生成一個表明所有執行作業及其所含任務狀態的全域性檢視。同時,JobClient通過查詢JobTracker來獲取最新狀態。客戶端也可以使用JobClient的getJob()方法來得到一個RunningJob的例項,後者包含作業的所有狀態資訊。

    3.2.6 作業的完成

    當JobTracker收到作業最後一個任務已完成的通知後,便把作業的狀態設定為“成功”。然後,在JobCli?ent查詢狀態時,便知道任務已經完成,於是JobClient列印一條訊息告知使用者,然後從runJob()方法返回。最後,JobTracker清空作業的工作狀態,指示TaskTracker也清空作業的工作狀態。

3.3 作業的排程

    早期版本的Hadoop使用一種非常簡單的方法來排程使用者的作業。按照作業提交的順序,即先進先出(FIFO)排程演算法來執行作業。典型情況下,每個作業都會使用整個叢集,因此作業必須等待直到輪到自己執行。雖然共享叢集極有可能為多使用者提供大量資源,但問題在於如何公平地在使用者之間分配資源,這需要一個更好的排程器。

    後來版本的Hadoop 加入設定作業優先順序的功能。可以通過設定mapred.job.priority屬性或JobClient的setJobPriority()方法來設定優先順序。在這2種方法中,可以選擇VERY_HIGH、HIGH、NORMAL、LOW、VERY_LOW中的一個值作為優先順序。作業排程器會選擇優先順序最高的那個作業執行。

    在Hadoop中,MapReduce的排程器可以選擇。預設的排程器是FIFO,還可選擇Fair Scheduler和Capaci?ty Scheduler。

    Fair Scheduler的目標是讓每個使用者公平地共享叢集能力。如果只有一個作業,它會得到叢集的所有資源。隨著提交的作業越來越多,空閒的TaskTracker任務槽會以“讓每個使用者公平共享叢集”這種方式進行分配。即便一個使用者的長時間作業正在執行而且還在進行過程中,另一個使用者的一個短的作業會在合理的時間內完成。

    作業都被放在作業池中,在預設情況下,每個使用者都有自己的作業池。Fair Scheduler支援搶佔,如果一個池在特定一段時間內未得到公平的資源,它會中止執行池中使用過多資源的任務,以便把任務槽讓給執行資源不足的池。

    針對多作業排程,Capacity Scheduler排程方式下,叢集由很多佇列組成,每個佇列有一個分配能力。這一點與Fair Scheduler類似,只不過在每個佇列內部,作業根據FIFO方式排程。即Capacity Scheduler允許為每個使用者模擬一個獨立的使用FIFO Scheduling的MapReduce叢集。

4 應用場景及展望

    雲端計算的偉大之處就在於在進行大資料處理時不必再像以往一樣購買大量的伺服器叢集,租用伺服器處理大資料更加利於控制成本。Hadoop作為一個重量級的分散式處理開源框架已經在大資料處理領域有所作為,企業希望利用Hadoop來規劃其自身未來資料處理的藍圖。從EMC、Oracle到Microsoft,幾乎所有高科技廠商都宣佈了自己以Hadoop為基礎的大資料戰略。現今Hadoop已經成為IT商場吸引客戶的熱點詞彙。