hadoop初讀--寫資料時的資料流管道雙向機制
阿新 • • 發佈:2019-02-12
為了保證分散式資料的一致性和完整性,hadoop寫資料流時使用了寫資料和應答的雙向機制.
這裡著重說明的是反向應答其實是分為兩部分:
1.寫請求答應:
正常情況下,這個應答會從管道的最後一個數據節點開始,往客戶端方向傳送,管道上的每一個節點都會等待這個應答,收到應答後,才會開始接受資料,也就是說,客戶端會等待這個應答,然後才開始傳送資料。這個應答是同步的,即直到收到應答後才會進行下一步。應答包的結構,只有兩個欄位:返回碼和附加資訊,當返回碼是OP_STATUS_ERROR時,附件資訊提供了流中第一個出錯的資料節點地址資訊.
2.寫資料應答:
客戶端通過資料流管道傳送資料,管道上的資料節點會在接受資料並寫磁碟後,需要給上游節點發送確認包,以清除緩衝區的內容。確認包從最後一個數據節點發送,逆流而上,直達資料來源。應答包對應的類是:
DataTransferProtocol.PipelineAck.
下面貼下略簡程式碼,以註釋方式說明幾個本人以為的需要關注的點:
1.寫請求應答處理的程式碼:
/**
* 1.當前節點的寫請求應答處理
*/
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
mirrorNode = targets[0].getName();
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
try {
int timeoutValue = datanode.socketTimeout +
(HdfsConstants.READ_TIMEOUT_EXTENSION * numTargets);
int writeTimeout = datanode.socketWriteTimeout +
(HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
mirrorOut = new DataOutputStream(
new BufferedOutputStream(
NetUtils.getOutputStream(mirrorSock, writeTimeout),
SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );
mirrorOut.writeLong( block.getBlockId() );
mirrorOut.writeLong( block.getGenerationStamp() );
mirrorOut.writeInt( pipelineSize );
mirrorOut.writeBoolean( isRecovery );
Text.writeString( mirrorOut, client );
mirrorOut.writeBoolean(hasSrcDataNode);
if (hasSrcDataNode) {
srcDataNode.write(mirrorOut);
}
mirrorOut.writeInt( targets.length - 1 );
for ( int i = 1; i < targets.length; i++ ) {
targets[i].write( mirrorOut );
}
accessToken.write(mirrorOut);
blockReceiver.writeChecksumHeader(mirrorOut);
mirrorOut.flush();
if (client.length() != 0) {
/**
* mirrorOut.flush();
* 後同步等待應答mirrorIn.readShort()
*/
mirrorInStatus = mirrorIn.readShort();
firstBadLink = Text.readString(mirrorIn);
if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
" from downstream datanode with firstbadlink as " + firstBadLink);
}
}
} catch (IOException e) {
/**
* 在建立mirrorSock(mirrorOut和mirrorIn)時,如果出現IOException, 返回DataTransferProtocol.OP_STATUS_ERROR的返回碼和包含第一個出錯的資料節點資訊的附加資訊
*/
/**
* 這裡的狀態和mirrorNode才是真正對應當前資料節點的.
*/
if (client.length() != 0) {
replyOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
Text.writeString(replyOut, mirrorNode);
replyOut.flush();
}
IOUtils.closeStream(mirrorOut);
mirrorOut = null;
IOUtils.closeStream(mirrorIn);
mirrorIn = null;
IOUtils.closeSocket(mirrorSock);
mirrorSock = null;
if (client.length() > 0) {
throw e;
} else {
LOG.info(datanode.dnRegistration + ":Exception transfering block " +block + " to mirror " + mirrorNode +". continuing without the mirror.\n" + StringUtils.stringifyException(e));
}
}
}
/**
* 2.下一個節點反饋的寫請求應答處理
*/
if (client.length() != 0) {
if (LOG.isDebugEnabled() || mirrorInStatus != DataTransferProtocol.OP_STATUS_SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
/**
* 其實mirrorInStatus和firstBadLink是下一個資料節點的狀態(是從mirrorIn裡讀取的,見388-390行程式碼)
*/
replyOut.writeShort(mirrorInStatus);
Text.writeString(replyOut, firstBadLink);
replyOut.flush();
}
2.寫資料應答處理的程式碼:
/**
* 主方法在PacketResponder.run()
*/
.............
short[] replies = null;
if (mirrorError) { // no ack is read
replies = new short[2];
replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
replies[1] = DataTransferProtocol.OP_STATUS_ERROR;
} else {//構造成功應答
/**
* ack.getNumOfReplies():收集下游資料額節點處理結果
*/
short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
replies = new short[1+ackLen];
replies[0] = DataTransferProtocol.OP_STATUS_SUCCESS;
for (int i=0; i<ackLen; i++) {
//加入下游應答
replies[i+1] = ack.getReply(i);
}
}
PipelineAck replyAck = new PipelineAck(expected, replies);
replyAck.write(replyOut);//往上游傳送應答
replyOut.flush();
if (LOG.isDebugEnabled()) {
LOG.debug("PacketResponder " + numTargets +" for block " + block +" responded an ack: " + replyAck);
}
...........