1. 程式人生 > >DFSClient技術內幕 (DFSClient介紹以及其初始化)

DFSClient技術內幕 (DFSClient介紹以及其初始化)

              |--------DataStreamer extends Daemon (extends Thread)
              |--------ResponseProcessor extends Thread


LeaseChecker介紹:實現了Runnable介面
在HDFS中可能有多個客戶端在同一時刻進行檔案的寫入操作,有時會出現多個客戶端併發的寫入一個檔案的情況,所以採取一些措施來控制併發寫入情況的傳送,一般情況下會採用互斥鎖的方法來進行控制,使得每一時刻只有一個獲得鎖的客戶端才能執行,寫入操作。但是互斥鎖的機制在分散式系統中會有很多問題
問題一:每次執行寫入時,客戶端都需要向NameNode申請互斥鎖,從而造成網路開銷的增大
問題二:當某個客戶端獲得鎖之後和NameNode失去了聯絡,此時會造成互斥鎖無法釋放,使得其他的客戶端的操作會被終止
解決方案:HDFS使用Lease租約來解決互斥鎖的問題

過程:當DFSClient需要對一個檔案執行寫入操作時,他首先需要向NameNode申請一個租約(有時間限制),在時間期限內客戶端可以             對租約所管理的檔案執行寫入。一個檔案只能被一個租約鎖管理,所以只能有一個客戶端對檔案執行寫入操作,在租約的有效時間             內,DFSClient客戶端會一直持有寫檔案的許可權,而不需要再向NameNode詢問是否有寫檔案的許可權。當客戶端一直工作時,它會在          租約過期後向NameNode申請續約,入股在租約的有效期間內,客戶端發生了異常,和NameNode失去了聯絡,當租約期滿後,                NameNode會發現發生異常的客戶端,此時NameNode會將新的租約賦給其它正常的客戶端,當發生異常的客戶端已經寫入了一部             分資料時,HDFS為了分辨出這些無用的資料,會在客戶端每次寫入資料時增加版本號資訊,異常的客戶端的寫入的資料的版本號            會很低,從而可以被安全刪除掉。

LeaseChecker作用: 在DFSClient中有個LeaseChecker執行緒,該執行緒會週期性的檢查租約是否過期,在租約快過期的時候會對租約進行續約,此外,在namenode包中有個LeaseManager租約管理器,該管理器會不斷的檢查它所管理的lease是否過期,如果lease已經過期,會將其刪除

DNAddrPair介紹:封裝了定位到的DataNode資訊和DataNode所對應的IP資訊

FSInputChecker 介紹:
抽象類FSInputChecker繼承自FSInputStream,加入了HDFS所需要的校驗功能,hadoop會生成與原生檔案所對應的校驗和檔案,並在讀寫檔案的時候對檔案進行校驗,
以確保資料的準確性

BlockReader介紹:BlockReader 繼承自 FSInputChecker  繼承自  FSInputStream,校驗功能是在readChecksumChunk方法中實現,而readChecksumChunk私有方法是被read1私有方法內部呼叫,而且所有的read方法的都是通過間接地呼叫read1方法來實現對資料進行讀取並做校驗和驗證的

DFSInputStream介紹:繼承自FSInputStream,該類會建立到DataNode的Socket連線,然後使用Socket來讀取DataNode上的資料資訊

DFSDataInputStream介紹:繼承自DataInputStream,DFSDataInputStream的功能都依靠包裝的DFSInputStream來完成

DFSOutputStream介紹:繼承自DFSOutputStream


Packet介紹:資料包,DFSOutputStream的內部類,DFSClient是通過一個個Packet來向DataNode寫入資料的,一個Packet由多個數據chunk組成,每個chunk對應著一個校驗和,當寫入足夠的chunk之後,packet會被新增到dataQueue中

DataStreamer 介紹:DataStreamer是真正寫入資料的程序,在傳送Packet之前,它會首先從Namenode中獲得一個blockid和Block的位置資訊,然後它會迴圈地從dataQueue中取得一個Packet,然後將該Packet真正寫入到與DataNode所建立的socket中, 當將屬於一個Block的所有Packet都發送給DataNode,並且返回了與每個Packet所對應的響應資訊之後,DataStream 會關閉當前的資料Block

ResponseProcessor 介紹:響應處理器ResponseProcessor

至此,DFSClient討論完畢
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
DFSClient構造器群:
  public DFSClient(Configuration conf) throws IOException {     this(NameNode.getAddress(conf), conf);   }  public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf       ) throws IOException {    this(nameNodeAddr, conf, null);   }
  
   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,                    FileSystem.Statistics stats)     throws IOException {     this(nameNodeAddr, null, conf, stats);   }   

DFSClient為使用者提供了簡單,一致的標準訪問介面下面,但其內部實現較為複雜,本人陪同大家一起一起去探索這個神奇的國度
我們需要構建一個DFSClient物件:
DFSClient提供了4種形式的構造器,構造方法的主要任務有兩個:
a,讀入配置項並初始化一些成員變數
b,建立和名位元組點的IPC連線

詳細過程分析:a過程被初始化物件如下:
1,配置物件configuration,
2,
收集檔案系統統計資訊的物件,
3,socket連線的過期時間,
4,寫入的資料包的大小
5,通過socket向dataNode寫入資料的超期時間
6,建立socket連線的工廠類7, 使用者組資訊
8, 最大塊獲取失敗次數
9,客戶端的名稱(如果該任務是Map-reduces任務,則使用任務ID作為客戶端名稱)
9, 預設的塊大小(64M),預設的塊副本數
 

構造器程式碼如下:
  /**     * Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode.    * Exactly one of nameNodeAddr or rpcNamenode must be null.    */   DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode,       Configuration conf, FileSystem.Statistics stats)     throws IOException {     this.conf = conf;     this.stats = stats;     this.socketTimeout = conf.getInt("dfs.socket.timeout",                                       HdfsConstants.READ_TIMEOUT);     this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",                                             HdfsConstants.WRITE_TIMEOUT);     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);     // dfs.write.packet.size is an internal config variable     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);     this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);     try {       this.ugi = UnixUserGroupInformation.login(conf, true);     } catch (LoginException e) {       throw (IOException)(new IOException().initCause(e));     }     String taskId = conf.get("mapred.task.id");     if (taskId != null) {     //如果是MapReduce任務,則客戶端名稱為任務Id號,否則取隨機號       this.clientName = "DFSClient_" + taskId;      } else {       this.clientName = "DFSClient_" + r.nextInt();     }     defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);     defaultReplication = (short) conf.getInt("dfs.replication", 3);     if (nameNodeAddr != null && rpcNamenode == null) {       this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
      //非常關鍵的一步 
         
通過RetryProxy的create方法來建立NameNode的RPC客戶端ClientProtocol       this.namenode = createNamenode(this.rpcNamenode);     } else if (nameNodeAddr == null && rpcNamenode != null) {       //This case is used for testing.       this.namenode = this.rpcNamenode = rpcNamenode;     } else {       throw new IllegalArgumentException(           "Expecting exactly one of nameNodeAddr and rpcNamenode being null: "           + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode);     }   }





b過程 :
通過呼叫私有方法createNamenode建立與名位元組點的連線,方法內部通過RetryProxy的create方法來建立NameNode的RPC客戶端ClientProtocol
至此,DFSClient初始化完成 
注:對於檔案系統,本文的討論中一直區分兩種情況,namenode的遠端方法不在本文討論範圍內
a,檔案和目錄相關事務(都使用遠端介面客戶端namenode,呼叫其同名遠端方法)
b,資料塊讀寫 

讀寫資料過程請參考
http://user.qzone.qq.com/578333569/infocenter#!app=2&via=QZ.HashRefresh&pos=1383552016