Hadoop資料流原理+例項程式碼
從HDFS中讀取檔案test.txt
前提:啟動Hadoop的所有元件
⑴準備test.txt
查詢tetst.txt檔案是否存在:hadoop fs -ls hdfs://localhost/test/
注意:本人的test.txt檔案在/test目錄下,根據自己的實際查詢,如果沒有則建立一個。如下是test.txt檔案中的內容
[[email protected] /]$ hadoop fs -cat hdfs://localhost/test/test.txt
hello world
===========
-- name : liudong
⑵編寫程式碼(在windows下的eclipise中完成,注意jar包的匯入)
package com.dong.hello; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class FileSystemCat { public static void main(String[] args) throws IOException{ //從HDFS上讀取test.txt檔案,顯示到終端 String uri="hdfs://localhost/test/test.txt"; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); InputStream in = null; try{ in = fs.open(new Path(uri)); IOUtils.copyBytes(in, System.out, 4096,false); }finally{ IOUtils.closeStream(in); } } }
⑶匯出成jar包的形式(這裡一定要注意windows中JDK的版本和Hadoop中的JDK的版本一致,不然不能執行)
⑷上傳到hadoop所在的主機(這裡我的是Centos)(使用軟體為WinSCP)
⑸在Centos中使用hadoop命令進行執行
[[email protected] ~]$ hadoop jar hadoopHello.jar
hello world
===========
-- name : liudong
(如果現實沒有找到主類,則注意HADOOP_CLASSPATH的配置,或者jar包)
Hadoop資料流原理:
客戶端通過呼叫FileSystem物件的open()方法開啟目標檔案,從HDFS的角度出發:
DistributedFileSystem通過使用RFC呼叫namenode,來確定檔案起始塊的位置,namenode返回存有該該塊副本的datanode地址,如果客戶端本身就是一個datanode,那客戶端會從儲存有相應資料塊複本的本地datanode讀取資料。DistributedFileSystem類返回一個FSDataInputStream物件讓客戶端以便讀取資料,FSDataInputStream類封裝DFSInputStream物件,DSFInputStream物件管理著datanode,namenode的I/O.
然後客戶端對輸入流呼叫read()方法,儲存著檔案起始塊的datanode地址的DFSInputStream隨即連線距離最近的檔案中第一個塊所在的datanode.通過對資料流反覆呼叫read()方法,可以將資料從datanode傳輸到客戶端,當到達塊的末端時,DFSInputStream關閉與該datanode的連線,尋找下一個塊的最佳的datanode,當客戶端完成讀取,就對FSDataInputStream呼叫close()方法。
在讀取的過程,如果DFSInputStream在於datanode出錯,會嘗試從這個塊最近的datanode讀取資料,同時記住這個故障datanode,保證以後不會反覆讀取該節點上後續的塊。
客戶端通過呼叫create()方法來建立檔案,從HDFS的角度出發:
DistributedFileSystem對namenode建立一個RFC呼叫,在檔案系統的名稱空間中新建一個檔案,此時該檔案中還沒有相應的資料塊,namenode執行各種不同的檢查來確保這個檔案不存在以及客戶端是否建立檔案的許可權,如果這些檢查通過,namenode就會為建立新檔案記錄一條記錄,否則建立失敗,並像客戶端丟擲一個IOException異常。
DistributedFileSystem向客戶端返回一個FSDataOutputStream物件,由此客戶端可以開始寫入資料,在客戶端寫入資料時,DFSOutputStream將它分成一個個資料包,並且寫入內部佇列,稱為資料佇列,DataStreamer處理資料佇列,它負責挑選出合適儲存資料複本的一組datanode,並以此來要求namenode分配新的資料塊,這一組datanode構成一個管線,假設複本數為3,所以管線中有3個節點,DataStreamer將資料包流式傳輸到管線中第一個datanode,該datanode儲存資料包並將它傳送到管線中第二個datanode,同樣,第二個datanode儲存資料包並且傳送到管線中第三個datanode。
同使用DFSOutputStream維護一個內部資料包佇列來等待datanode的收到確認回執,稱為“確認佇列”,收到管道中所有datanode確認資訊後,該資料包才會從確認佇列刪除。如果datanode在資料寫入的時候發生故障,則先關閉管線,確認把佇列中所有資料包都添加回資料佇列的最前端,以確保故障節點下游的datanode不會漏掉任何一個數據包。同時將正常datanode的標識傳送給namenode,以便故障datanode在恢復後可以刪除儲存在部分資料塊。從管線中刪除故障datanode,基於兩個正常datanode構建一個新的管線,剩下的資料塊寫入管線中正常的datanode,namenode注意到塊副本量不足時,會在另一個節點上建立新的複本。客戶端完成資料的寫入後,對資料流呼叫close()方法。