1. 程式人生 > >HDFS資料的讀寫過程

HDFS資料的讀寫過程

1.資料讀取過程

一般的檔案讀取操作包括:open 、read、close等 

客戶端讀取資料過程,其中1、3、6步由客戶端發起:

客戶端首先獲取FileSystem的一個例項,這裡就是HDFS對應的例項:

①客戶端呼叫FileSystem例項的open方法,獲得這個檔案對應的輸入流,在HDFS中就是DFSInputStream

②構造第一步中的輸入流DFSInputStream時,通過RPC遠端呼叫NameNode可以獲得NameNode中此檔案對應的資料塊的儲存位置,包括這個檔案的副本的儲存位置(主要是各DataNode的地址)。注意,在輸入流中會按照網路拓撲結構,根據與客戶端距離對DataNode進行簡單排序

③④獲得此輸入流之後,客戶端呼叫read方法讀取資料。輸入流DFSInputStream會根據前面的排序結果,選擇最近的DataNode建立連線並讀取資料。如果客戶端和其中一個DataNode位於一個機器中(比如MapReduce過程中的mapper和reducer),那麼就會直接從本地讀取資料。

⑤如果已達到資料塊末端,那麼關閉與這個DataNode的連線,然後重新查詢下一個資料塊。

不斷執行②-⑤直到資料全部讀完,然後呼叫close

⑥客戶端呼叫close,關閉輸入流DFSInputStream

另外如果DFSInputStream和DataNode通訊時出現錯誤,或者是資料校驗出錯,那麼DFSInputStream就會重新選擇DataNode傳輸資料。

2.資料寫入過程

 

客戶端寫入資料過程,其中1、3、6由客戶端發起

客戶端首先要獲取FileStream的一個例項,這裡就是HDFS的例項,

①②客戶端呼叫FileSystem例項的create方法,建立檔案。NameNode通過檢查,比如檔案是否存在,客戶端是否擁有建立許可權等;通過檢查之後,在NameNode新增檔案資訊。注意,因為此時檔案沒有資料,所以NameNode上也沒有檔案資料塊資訊。建立結束後,HDFS會返回一個輸出流DFSDataOutputStream給客戶端。

③客戶端呼叫輸出流DFSDataOutputStream的write方法向HDFS中對應的檔案寫入資料。資料首先會被分包,這些分包會寫入一個輸入流內部佇列Data佇列中,接收完整資料分包,輸出流DFSDataOutputStream會向nameNode申請儲存檔案和副本資料塊的若干個DataNode,這若個個DataNode會形成一個數據傳輸管道。

④DFSDataOutputStream會(根據網路拓撲結構排序)將資料傳輸給距離上最短的DataNode,這個DataNode接收到資料包之後會傳遞給下一個DataNode,資料在各DataNode之間通過管道流動,而不是全部由輸出流分發,這樣可以減少傳輸開銷。

⑤因為DataNode位於不同機器上,資料需要通過網路傳送,所以,為了保證所有的DataNode的資料都是準確的,接收到資料的DataNode要向傳送者傳送確認包(ACKPacket)。對於某個資料塊,只有當DFSDataOutputStream收到了所有DataNode的正確ACK,才能確認傳輸結束。DFSDataOutputStream內部專門維護了一個等待ACK佇列,這一佇列儲存已經進入管道傳輸資料、但是並未被完全確認的資料包。

   不斷③-⑤直到資料全部寫完,客戶端呼叫close關閉檔案。

⑥客戶端呼叫close方法,DFSDataOutputStream繼續等待直到所有資料寫入完畢並被確認,呼叫complete方法通知NameNode檔案寫入完成。

⑦NameNode接收到complete訊息之後,等待相應數量的副本寫入完畢後,告知客戶端即可。

3、在傳輸過程中,如果發現某個DataNode失效(未聯通,ACK超時),那麼HDFS執行如下操作:

①關閉資料傳輸的管道

②將等待ACK佇列中的資料放到Data佇列的頭部

③更新正常DataNode中所有資料塊的版本;當失效的DataNode重啟之後,之前的資料塊會因為版本不對而被清除。

④在傳輸管道中刪除失效DataNode,重新建立管道併發送資料包。

3.HDFS基本操作常用API 

1.建立檔案

public FSDataOutputStream create(Path f) throws IOException;

 這裡會返回一個FSDataOutputStream物件,通過這個物件來調入write方法

2.開啟檔案

public FSDataInputStream open(Path f) throws IOException ;

這裡會返回 FSDataInputStream物件,通過這個物件呼叫read方法

3.獲取檔案資訊

public FileStatus getFileStatus(Path f) throws IOException;

這裡返回 FileStatus物件,其中儲存了檔案很多資訊

4.獲取目錄資訊

public FileStatus[] listStatus(Path f) throws IOException;

5.讀取

public int read(long position, byte[] buffer, int offset, int length) throws IOException;

6.寫入

public void write(byte[] b, int off, int len) throws IOException

7.關閉

public void close() throws IOException;

8.刪除

public boolean delete(Path f, boolean recursive) throws IOException;

參考 :《深入理解大資料 大資料處理與程式設計實踐》