讀HDFS書筆記---5.3 檔案短路讀操作
這一節的目錄:
5.3 檔案短路讀操作
5.3.1 短路讀共享記憶體
5.3.2 DataTransferProtocol
5.3.3 DFSClient短路讀操作流程
5.3.4 Datanode短路讀操作流程
在HDFS早期版本中,本地讀取和遠端讀取的實現是一樣的,客戶端通過TCP套接字連線Datanode,並通過DataTransferProtocol傳輸資料。這種方式很簡單,但是有一些不好的地方,例如Datanode需要為每個讀取資料塊的客戶端都維持一個執行緒和TCP套接字。核心中TCP協議是有開銷的,DataTransferProtocol本身也有開銷,因此這種實現方式有值得優化的地方,早起版本讀操作流程圖如下:
既然客戶端和Datanode在同一臺機器上,那麼DFSClient可以跳過Datanode,直接讀取磁碟上的資料,目前HDFS的實現方案有兩種
<1>、FS-2246
Datanode將所有的資料路徑權開發給客戶端,當執行一個本地讀取時,客戶端直接從本地磁碟的資料路徑讀取資料。但這種實現方式帶來了安全問題,客戶端使用者可以直接瀏覽所有資料,這會存在一定的不安全性,不是一個好選擇,下圖是相應的方案流程圖
<2> HDFS-347
UNIX提供了一種UNIX Domain Socket程序間通訊方式,它使得同一臺機器上的兩個程序能以Socket的方式通訊,並且還可以在程序間傳遞檔案描述符。
HDFS-347使用該機制實現了安全的本地短路讀取,如下圖所示:
客戶端向Datanode請求資料時,Datanode會開啟塊檔案和校驗和檔案,將這兩個檔案的檔案描述符直接傳遞給客戶端,而不是將路徑傳遞給客戶端。客戶端接收到這兩個檔案的檔案描述符之後,就可以直接開啟檔案讀取資料了,也就是繞過了Datanode程序的轉發,提高了讀取效率。因為檔案描述符是隻讀的,所以客戶端不能修改該檔案,同時,由於客戶端自身無法訪問資料塊檔案所在的目錄,所以它也就不能訪問其他不該訪問的資料了,保證了讀取的安全性。HDFS 2.x採取的就是HDFS-347的設計實現短路讀取功能的。
5.3.1 短路讀共享記憶體
瞭解了短路讀取的概念之後,我們看一下HDFS是如何實現這種模式的,在DFSClient中,使用ShortCircuitReplica類封裝可以進行短路讀取的副本。ShortCircuitReplica物件中包含了短路讀取副本的資料塊檔案輸入流、校驗檔案輸入流、短路讀取副本在共享記憶體中的槽位(slot)以及副本的引用次數等資訊。DFSClient會持有一個ShortCircuitCache物件快取並管理所有的ShortCircuitReplica物件,DFSClient從ShortCircuitCache獲得了ShortCircuitReplica的引用之後,就可以構造BlockReaderLocal物件進行本地讀取操作了。
如圖5-15所示,當DFSClient和Datanode在同一臺機器上時,需要一個共享記憶體段來維護所有短路讀取副本的狀態,共享記憶體段中會有很多槽位,每個槽位都記錄了一個短路讀取副本的資訊,例如當前副本是否有效、錨(anchor)的次數等。當Datanode將一個數據塊副本快取到記憶體中時,會將這個資料塊副本設定為可錨(anchorable)狀態,也就是在共享記憶體中該副本對應的槽位上設定可錨狀態位。當一個副本被設定為可錨狀態之後,DFSClient的BlockReaderLocal物件讀取該副本時就不需要校驗了(因為快取中的副本已經執行過校驗操作),並且輸入流可以通過零拷貝模式讀取這個副本。每當客戶端進行這兩種讀取操作時,都需要在副本對應的槽位上新增一個錨計數,只有副本的錨計數為零時,Datanode才可以從快取中刪除這個副本。可以看到共享記憶體以及槽位機制很好地在Datanode程序和DFSClient程序間同步了副本的狀態,保證了Datanode快取操作以及DFSClient讀取副本操作的正確性。
如圖5-16所示,共享記憶體機制是由DFSClient和Datanode對同一個檔案執行記憶體對映操作實現的。因為MappedByteBuffer物件能讓記憶體與物理檔案的資料實時同步,所以DFSClient和Datanode程序會通過中間檔案來交換資料,中間檔案使得兩個程序的記憶體區域得到及時的同步。DFSClient和Datanode之間可能會有多段共享記憶體,所以DFSClient定義了DFSClientShm類抽象DFSClient側的一段共享記憶體,定義了DFSClientShmManager類管理所有的DFSClientShm物件,而Datanode則定義了RegisteredShm類抽象Datanode側的一段共享記憶體,同時定義了ShortCircuitRegistry類管理所有Datanode側的共享記憶體。
DFSClient會呼叫DataTransferProtocol.requestShortCircuitShm介面與Datanode協商建立一段共享記憶體,共享記憶體建立成功後,DFSClient和Datanode會分別構造DFSClientShm以及RegisteredShm物件維護這段共享記憶體。如圖5-17所示,共享記憶體中的檔案對映資料是實時同步的,它儲存了所有槽位二進位制資訊。但是對映資料中二進位制的槽位資訊並不便於操作,所以DFSClientShm和RegisteredShm會構造一個Slot物件操作對映資料中的一個槽位,同時各自定義了集合欄位儲存所有的Slot物件。這裡需要特別注意的是,Slot物件會由DFSClientShm和RegisteredShm分別構造並儲存在各自的集合欄位中,所以DFSClientShm和RegisteredShm之間需要同步Slot物件的建立和刪除操作,以保證DFSClientShm和RegisteredShm儲存的Slot物件資訊是完全同步的。DataTransferProtocol介面就提供了requestShortCircuitFds()以及releaseShortCircuitFds()方法同步Slot物件的建立和刪除操作。
5.3.2 DataTransferProtocol
DataTransferProtocol定義了requestShortCircuitShm()、requestShortCircuitFds()以及releaseShortCircuitFds()三個介面方法同步Datanode和DFSClient對共享記憶體的操作。
需要注意的是DataTransferProtocol底層是基於Socket流的,而當DFSClient和Datanode在同一臺物理機器上時,DataTransferProtocol底層的Socket將會是DomainSocket,使用DomainSocket的DataTransferProtocol可以在Socket流中傳遞檔案描述符。
接下來我們依次看一下requestShortCircuitShm()、requestShortCircuitFds()以及releaseShortCircuitFds()方法的實現。
<1>、requestShortCircuitShm()
程式碼如下:
@Override
public void requestShortCircuitShm(String clientName) throws IOException {
NewShmInfo shmInfo = null;
boolean success = false;
DomainSocket sock = peer.getDomainSocket();//獲取底層DomainSocket物件
try {
if (sock == null) {//如果DataTransferProtocol底層不是DomainSocket,則發回異常
sendShmErrorResponse(ERROR_INVALID, "Bad request from " +
peer + ": must request a shared " +
"memory segment over a UNIX domain socket.");
return;
}
try {
//呼叫ShortCircuitRegistry.createNewMemorySegment()方法建立共享記憶體段
shmInfo = datanode.shortCircuitRegistry.
createNewMemorySegment(clientName, sock);
// After calling #{ShortCircuitRegistry#createNewMemorySegment}, the
// socket is managed by the DomainSocketWatcher, not the DataXceiver.
releaseSocket();
} catch (UnsupportedOperationException e) {//丟擲異常,則響應異常訊息
sendShmErrorResponse(ERROR_UNSUPPORTED,
"This datanode has not been configured to support " +
"short-circuit shared memory segments.");
return;
} catch (IOException e) {//丟擲異常,則響應異常訊息
sendShmErrorResponse(ERROR,
"Failed to create shared file descriptor: " + e.getMessage());
return;
}
//呼叫sendShmSuccessResponse()方法將共享記憶體檔案的檔案描述符發回客戶端
sendShmSuccessResponse(sock, shmInfo);
success = true;
} finally {
if (ClientTraceLog.isInfoEnabled()) {
if (success) {
BlockSender.ClientTraceLog.info(String.format(
"cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
"op: REQUEST_SHORT_CIRCUIT_SHM," +
" shmId: %016x%016x, srvID: %s, success: true",
clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(),
datanode.getDatanodeUuid()));
} else {
BlockSender.ClientTraceLog.info(String.format(
"cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " +
"op: REQUEST_SHORT_CIRCUIT_SHM, " +
"shmId: n/a, srvID: %s, success: false",
clientName, datanode.getDatanodeUuid()));
}
}
if ((!success) && (peer == null)) {
// If we failed to pass the shared memory segment to the client,
// close the UNIX domain socket now. This will trigger the
// DomainSocketWatcher callback, cleaning up the segment.
IOUtils.cleanup(null, sock);
}
IOUtils.cleanup(null, shmInfo);
}
}
DFSClient在執行任何短路讀取操作之前,需要先申請一段共享記憶體儲存短路讀取副本的狀態。DFSClient會呼叫DataTransferProtocol.requestShortCircuitShm()方法向Datanode發起申請共享記憶體的請求,Datanode的DataXceiver.requestShortCircuitShm()方法會響應這個請求。
如上面的程式碼所示,DataXceiver.requestShortCircuitShm()會呼叫ShortCircuitRegistry.createNewMemorySegment()方法建立共享記憶體段,createNewMemorySegment()方法會將共享記憶體檔案對映到Datanode的記憶體中,然後構造RegisteredShm類管理這段共享記憶體(請參考ShortCircuitRegistry類小節中的分析)。之後DataXceiver.requestShortCircuitShm()方法會呼叫sendShmSuccessResponse()方法將共享記憶體檔案的檔案描述符通過domainSocket發回客戶端。
DFSClient的DfsClientShmManager物件從domainSocket接收了共享記憶體檔案的檔案描述符後,會開啟共享記憶體檔案並將該檔案對映到DFSClient的記憶體中,之後建立DFSClientShm物件管理這段共享記憶體(DFSClientShmManager類小節中的分析),並將這個DFSClientShm物件儲存在DFSClientShmManager的對應欄位中。
<2>、requestShortCircuitFds()
程式碼如下:
@Override
public void requestShortCircuitFds(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> token,
SlotId slotId, int maxVersion) throws IOException {
updateCurrentThreadName("Passing file descriptors for block " + blk);
BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder();
FileInputStream fis[] = null;
try {
if (peer.getDomainSocket() == null) {//如果底層不是DomainSocket,則丟擲異常
throw new IOException("You cannot pass file descriptors over " +
"anything but a UNIX domain socket.");
}
if (slotId != null) {
boolean isCached = datanode.data.
isCached(blk.getBlockPoolId(), blk.getBlockId());
//呼叫ShortCircuitRegistry.registerSlot()方法在Datanode的共享記憶體中註冊這個Slot物件
datanode.shortCircuitRegistry.registerSlot(
ExtendedBlockId.fromExtendedBlock(blk), slotId, isCached);
}
try {
//獲取資料塊檔案以及校驗和檔案的檔案描述符
fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion);
} finally {
if ((fis == null) && (slotId != null)) {
datanode.shortCircuitRegistry.unregisterSlot(slotId);
}
}
//構造響應訊息
bld.setStatus(SUCCESS);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
} catch (ShortCircuitFdsVersionException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION);
bld.setMessage(e.getMessage());
} catch (ShortCircuitFdsUnsupportedException e) {
bld.setStatus(ERROR_UNSUPPORTED);
bld.setMessage(e.getMessage());
} catch (InvalidToken e) {
bld.setStatus(ERROR_ACCESS_TOKEN);
bld.setMessage(e.getMessage());
} catch (IOException e) {
bld.setStatus(ERROR);
bld.setMessage(e.getMessage());
}
try {
//發回成功的響應訊息
bld.build().writeDelimitedTo(socketOut);
if (fis != null) {
FileDescriptor fds[] = new FileDescriptor[fis.length];
for (int i = 0; i < fds.length; i++) {
fds[i] = fis[i].getFD();
}
byte buf[] = new byte[] { (byte)0 };
//通過DomainSocket將資料塊檔案和校驗和檔案的檔案描述符傳送給客戶端
peer.getDomainSocket().
sendFileDescriptors(fds, buf, 0, buf.length);
}
} finally {
if (ClientTraceLog.isInfoEnabled()) {
DatanodeRegistration dnR = datanode.getDNRegistrationForBP(blk
.getBlockPoolId());
BlockSender.ClientTraceLog.info(String.format(
"src: 127.0.0.1, dest: 127.0.0.1, op: REQUEST_SHORT_CIRCUIT_FDS," +
" blockid: %s, srvID: %s, success: %b",
blk.getBlockId(), dnR.getDatanodeUuid(), (fis != null)
));
}
if (fis != null) {
IOUtils.cleanup(LOG, fis);
}
}
}
<3>、releaseShortCircuitFds()
程式碼如下:
@Override
public void releaseShortCircuitFds(SlotId slotId) throws IOException {
boolean success = false;
try {
String error;
Status status;
try {
//釋放共享記憶體中的槽位
datanode.shortCircuitRegistry.unregisterSlot(slotId);
error = null;
status = Status.SUCCESS;
} catch (UnsupportedOperationException e) {
error = "unsupported operation";
status = Status.ERROR_UNSUPPORTED;
} catch (Throwable e) {
error = e.getMessage();
status = Status.ERROR_INVALID;
}
//構造響應訊息
ReleaseShortCircuitAccessResponseProto.Builder bld =
ReleaseShortCircuitAccessResponseProto.newBuilder();
bld.setStatus(status);
if (error != null) {
bld.setError(error);
}
//發回響應訊息
bld.build().writeDelimitedTo(socketOut);
success = true;
} finally {
if (ClientTraceLog.isInfoEnabled()) {
BlockSender.ClientTraceLog.info(String.format(
"src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS," +
" shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b",
slotId.getShmId().getHi(), slotId.getShmId().getLo(),
slotId.getSlotIdx(), datanode.getDatanodeUuid(), success));
}
}
}