DFSClient技術內幕 (DFSClient介紹以及其初始化)
阿新 • • 發佈:2019-02-18
|--------DataStreamer extends Daemon (extends Thread)
|--------ResponseProcessor extends Thread
LeaseChecker介紹:實現了Runnable介面
在HDFS中可能有多個客戶端在同一時刻進行檔案的寫入操作,有時會出現多個客戶端併發的寫入一個檔案的情況,所以採取一些措施來控制併發寫入情況的傳送,一般情況下會採用互斥鎖的方法來進行控制,使得每一時刻只有一個獲得鎖的客戶端才能執行,寫入操作。但是互斥鎖的機制在分散式系統中會有很多問題
問題一:每次執行寫入時,客戶端都需要向NameNode申請互斥鎖,從而造成網路開銷的增大
問題二:當某個客戶端獲得鎖之後和NameNode失去了聯絡,此時會造成互斥鎖無法釋放,使得其他的客戶端的操作會被終止
解決方案:HDFS使用Lease租約來解決互斥鎖的問題
過程:當DFSClient需要對一個檔案執行寫入操作時,他首先需要向NameNode申請一個租約(有時間限制),在時間期限內客戶端可以 對租約所管理的檔案執行寫入。一個檔案只能被一個租約鎖管理,所以只能有一個客戶端對檔案執行寫入操作,在租約的有效時間 內,DFSClient客戶端會一直持有寫檔案的許可權,而不需要再向NameNode詢問是否有寫檔案的許可權。當客戶端一直工作時,它會在 租約過期後向NameNode申請續約,入股在租約的有效期間內,客戶端發生了異常,和NameNode失去了聯絡,當租約期滿後, NameNode會發現發生異常的客戶端,此時NameNode會將新的租約賦給其它正常的客戶端,當發生異常的客戶端已經寫入了一部 分資料時,HDFS為了分辨出這些無用的資料,會在客戶端每次寫入資料時增加版本號資訊,異常的客戶端的寫入的資料的版本號 會很低,從而可以被安全刪除掉。
LeaseChecker作用: 在DFSClient中有個LeaseChecker執行緒,該執行緒會週期性的檢查租約是否過期,在租約快過期的時候會對租約進行續約,此外,在namenode包中有個LeaseManager租約管理器,該管理器會不斷的檢查它所管理的lease是否過期,如果lease已經過期,會將其刪除
DNAddrPair介紹:封裝了定位到的DataNode資訊和DataNode所對應的IP資訊
FSInputChecker 介紹:抽象類FSInputChecker繼承自FSInputStream,加入了HDFS所需要的校驗功能,hadoop會生成與原生檔案所對應的校驗和檔案,並在讀寫檔案的時候對檔案進行校驗, 以確保資料的準確性
BlockReader介紹:BlockReader 繼承自 FSInputChecker 繼承自 FSInputStream,校驗功能是在readChecksumChunk方法中實現,而readChecksumChunk私有方法是被read1私有方法內部呼叫,而且所有的read方法的都是通過間接地呼叫read1方法來實現對資料進行讀取並做校驗和驗證的
DFSInputStream介紹:繼承自FSInputStream,該類會建立到DataNode的Socket連線,然後使用Socket來讀取DataNode上的資料資訊
DFSDataInputStream介紹:繼承自DataInputStream,DFSDataInputStream的功能都依靠包裝的DFSInputStream來完成
DFSOutputStream介紹:繼承自DFSOutputStream
Packet介紹:資料包,DFSOutputStream的內部類,DFSClient是通過一個個Packet來向DataNode寫入資料的,一個Packet由多個數據chunk組成,每個chunk對應著一個校驗和,當寫入足夠的chunk之後,packet會被新增到dataQueue中
DataStreamer 介紹:DataStreamer是真正寫入資料的程序,在傳送Packet之前,它會首先從Namenode中獲得一個blockid和Block的位置資訊,然後它會迴圈地從dataQueue中取得一個Packet,然後將該Packet真正寫入到與DataNode所建立的socket中, 當將屬於一個Block的所有Packet都發送給DataNode,並且返回了與每個Packet所對應的響應資訊之後,DataStream 會關閉當前的資料Block
ResponseProcessor 介紹:響應處理器ResponseProcessor
至此,DFSClient討論完畢
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
DFSClient構造器群:
public DFSClient(Configuration conf) throws IOException { this(NameNode.getAddress(conf), conf); } public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf ) throws IOException { this(nameNodeAddr, conf, null); }
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf, FileSystem.Statistics stats) throws IOException { this(nameNodeAddr, null, conf, stats); }
DFSClient為使用者提供了簡單,一致的標準訪問介面下面,但其內部實現較為複雜,本人陪同大家一起一起去探索這個神奇的國度
我們需要構建一個DFSClient物件:
DFSClient提供了4種形式的構造器,構造方法的主要任務有兩個:
a,讀入配置項並初始化一些成員變數
b,建立和名位元組點的IPC連線
詳細過程分析:a過程被初始化物件如下:
1,配置物件configuration,
2,收集檔案系統統計資訊的物件,
3,socket連線的過期時間,
4,寫入的資料包的大小
5,通過socket向dataNode寫入資料的超期時間
6,建立socket連線的工廠類7, 使用者組資訊
8, 最大塊獲取失敗次數
9,客戶端的名稱(如果該任務是Map-reduces任務,則使用任務ID作為客戶端名稱)
9, 預設的塊大小(64M),預設的塊副本數
構造器程式碼如下:
/** * Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode. * Exactly one of nameNodeAddr or rpcNamenode must be null. */ DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { this.conf = conf; this.stats = stats; this.socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT); this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout", HdfsConstants.WRITE_TIMEOUT); this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); // dfs.write.packet.size is an internal config variable this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf); try { this.ugi = UnixUserGroupInformation.login(conf, true); } catch (LoginException e) { throw (IOException)(new IOException().initCause(e)); } String taskId = conf.get("mapred.task.id"); if (taskId != null) { //如果是MapReduce任務,則客戶端名稱為任務Id號,否則取隨機號 this.clientName = "DFSClient_" + taskId; } else { this.clientName = "DFSClient_" + r.nextInt(); } defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE); defaultReplication = (short) conf.getInt("dfs.replication", 3); if (nameNodeAddr != null && rpcNamenode == null) { this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
//非常關鍵的一步
通過RetryProxy的create方法來建立NameNode的RPC客戶端ClientProtocol this.namenode = createNamenode(this.rpcNamenode); } else if (nameNodeAddr == null && rpcNamenode != null) { //This case is used for testing. this.namenode = this.rpcNamenode = rpcNamenode; } else { throw new IllegalArgumentException( "Expecting exactly one of nameNodeAddr and rpcNamenode being null: " + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode); } }
b過程 :
通過呼叫私有方法createNamenode建立與名位元組點的連線,方法內部通過RetryProxy的create方法來建立NameNode的RPC客戶端ClientProtocol
至此,DFSClient初始化完成
注:對於檔案系統,本文的討論中一直區分兩種情況,namenode的遠端方法不在本文討論範圍內
a,檔案和目錄相關事務(都使用遠端介面客戶端namenode,呼叫其同名遠端方法)
b,資料塊讀寫
讀寫資料過程請參考http://user.qzone.qq.com/578333569/infocenter#!app=2&via=QZ.HashRefresh&pos=1383552016
|--------ResponseProcessor extends Thread
LeaseChecker介紹:實現了Runnable介面
在HDFS中可能有多個客戶端在同一時刻進行檔案的寫入操作,有時會出現多個客戶端併發的寫入一個檔案的情況,所以採取一些措施來控制併發寫入情況的傳送,一般情況下會採用互斥鎖的方法來進行控制,使得每一時刻只有一個獲得鎖的客戶端才能執行,寫入操作。但是互斥鎖的機制在分散式系統中會有很多問題
問題一:每次執行寫入時,客戶端都需要向NameNode申請互斥鎖,從而造成網路開銷的增大
問題二:當某個客戶端獲得鎖之後和NameNode失去了聯絡,此時會造成互斥鎖無法釋放,使得其他的客戶端的操作會被終止
解決方案:HDFS使用Lease租約來解決互斥鎖的問題
過程:當DFSClient需要對一個檔案執行寫入操作時,他首先需要向NameNode申請一個租約(有時間限制),在時間期限內客戶端可以 對租約所管理的檔案執行寫入。一個檔案只能被一個租約鎖管理,所以只能有一個客戶端對檔案執行寫入操作,在租約的有效時間 內,DFSClient客戶端會一直持有寫檔案的許可權,而不需要再向NameNode詢問是否有寫檔案的許可權。當客戶端一直工作時,它會在 租約過期後向NameNode申請續約,入股在租約的有效期間內,客戶端發生了異常,和NameNode失去了聯絡,當租約期滿後, NameNode會發現發生異常的客戶端,此時NameNode會將新的租約賦給其它正常的客戶端,當發生異常的客戶端已經寫入了一部 分資料時,HDFS為了分辨出這些無用的資料,會在客戶端每次寫入資料時增加版本號資訊,異常的客戶端的寫入的資料的版本號 會很低,從而可以被安全刪除掉。
LeaseChecker作用: 在DFSClient中有個LeaseChecker執行緒,該執行緒會週期性的檢查租約是否過期,在租約快過期的時候會對租約進行續約,此外,在namenode包中有個LeaseManager租約管理器,該管理器會不斷的檢查它所管理的lease是否過期,如果lease已經過期,會將其刪除
DNAddrPair介紹:封裝了定位到的DataNode資訊和DataNode所對應的IP資訊
FSInputChecker 介紹:抽象類FSInputChecker繼承自FSInputStream,加入了HDFS所需要的校驗功能,hadoop會生成與原生檔案所對應的校驗和檔案,並在讀寫檔案的時候對檔案進行校驗,
BlockReader介紹:BlockReader 繼承自 FSInputChecker 繼承自 FSInputStream,校驗功能是在readChecksumChunk方法中實現,而readChecksumChunk私有方法是被read1私有方法內部呼叫,而且所有的read方法的都是通過間接地呼叫read1方法來實現對資料進行讀取並做校驗和驗證的
DFSInputStream介紹:繼承自FSInputStream,該類會建立到DataNode的Socket連線,然後使用Socket來讀取DataNode上的資料資訊
DFSDataInputStream介紹:繼承自DataInputStream,DFSDataInputStream的功能都依靠包裝的DFSInputStream來完成
DFSOutputStream介紹:繼承自DFSOutputStream
Packet介紹:資料包,DFSOutputStream的內部類,DFSClient是通過一個個Packet來向DataNode寫入資料的,一個Packet由多個數據chunk組成,每個chunk對應著一個校驗和,當寫入足夠的chunk之後,packet會被新增到dataQueue中
DataStreamer 介紹:DataStreamer是真正寫入資料的程序,在傳送Packet之前,它會首先從Namenode中獲得一個blockid和Block的位置資訊,然後它會迴圈地從dataQueue中取得一個Packet,然後將該Packet真正寫入到與DataNode所建立的socket中, 當將屬於一個Block的所有Packet都發送給DataNode,並且返回了與每個Packet所對應的響應資訊之後,DataStream 會關閉當前的資料Block
ResponseProcessor 介紹:響應處理器ResponseProcessor
至此,DFSClient討論完畢
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
DFSClient構造器群:
public DFSClient(Configuration conf) throws IOException { this(NameNode.getAddress(conf), conf); } public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf ) throws IOException { this(nameNodeAddr, conf, null); }
public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf, FileSystem.Statistics stats) throws IOException { this(nameNodeAddr, null, conf, stats); }
DFSClient為使用者提供了簡單,一致的標準訪問介面下面,但其內部實現較為複雜,本人陪同大家一起一起去探索這個神奇的國度
我們需要構建一個DFSClient物件:
DFSClient提供了4種形式的構造器,構造方法的主要任務有兩個:
a,讀入配置項並初始化一些成員變數
b,建立和名位元組點的IPC連線
詳細過程分析:a過程被初始化物件如下:
1,配置物件configuration,
2,收集檔案系統統計資訊的物件,
3,socket連線的過期時間,
4,寫入的資料包的大小
5,通過socket向dataNode寫入資料的超期時間
6,建立socket連線的工廠類7, 使用者組資訊
8, 最大塊獲取失敗次數
9,客戶端的名稱(如果該任務是Map-reduces任務,則使用任務ID作為客戶端名稱)
9, 預設的塊大小(64M),預設的塊副本數
構造器程式碼如下:
/** * Create a new DFSClient connected to the given nameNodeAddr or rpcNamenode. * Exactly one of nameNodeAddr or rpcNamenode must be null. */ DFSClient(InetSocketAddress nameNodeAddr, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { this.conf = conf; this.stats = stats; this.socketTimeout = conf.getInt("dfs.socket.timeout", HdfsConstants.READ_TIMEOUT); this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout", HdfsConstants.WRITE_TIMEOUT); this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); // dfs.write.packet.size is an internal config variable this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf); try { this.ugi = UnixUserGroupInformation.login(conf, true); } catch (LoginException e) { throw (IOException)(new IOException().initCause(e)); } String taskId = conf.get("mapred.task.id"); if (taskId != null) { //如果是MapReduce任務,則客戶端名稱為任務Id號,否則取隨機號 this.clientName = "DFSClient_" + taskId; } else { this.clientName = "DFSClient_" + r.nextInt(); } defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE); defaultReplication = (short) conf.getInt("dfs.replication", 3); if (nameNodeAddr != null && rpcNamenode == null) { this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
//非常關鍵的一步
通過RetryProxy的create方法來建立NameNode的RPC客戶端ClientProtocol this.namenode = createNamenode(this.rpcNamenode); } else if (nameNodeAddr == null && rpcNamenode != null) { //This case is used for testing. this.namenode = this.rpcNamenode = rpcNamenode; } else { throw new IllegalArgumentException( "Expecting exactly one of nameNodeAddr and rpcNamenode being null: " + "nameNodeAddr=" + nameNodeAddr + ", rpcNamenode=" + rpcNamenode); } }
b過程 :
通過呼叫私有方法createNamenode建立與名位元組點的連線,方法內部通過RetryProxy的create方法來建立NameNode的RPC客戶端ClientProtocol
至此,DFSClient初始化完成
注:對於檔案系統,本文的討論中一直區分兩種情況,namenode的遠端方法不在本文討論範圍內
a,檔案和目錄相關事務(都使用遠端介面客戶端namenode,呼叫其同名遠端方法)
b,資料塊讀寫
讀寫資料過程請參考http://user.qzone.qq.com/578333569/infocenter#!app=2&via=QZ.HashRefresh&pos=1383552016