HDFS中讀取檔案總結
阿新 • • 發佈:2019-01-30
主要參考Hadoop 1.0.3程式碼
一 HDFS讀取過程概述
1.開啟檔案
1.1客戶端
HDFS開啟檔案時,呼叫DistributedFileSystem.open(Path f, int bufferSize),該方法在DistributedFileSystem.java的第152-156行
如下所示:
其中dfs為DistributedFileSystem的成員變數(型別為DFSClient)。 dfs.open(String src, int buffersize, boolean verifyChecksum, FileSystem.Statistics stats)這一方法,在 DFSClient.java的第573-579行
如下所示:
可以看到DFSInputStream被建立並返回。 在DFSInputStream建立過程中,建構函式在DFSClient.java的第1828-1835行, 如下所示:
可以看到,openInfo被呼叫。具體程式碼可以在DFSClient.java的第1840-1861行, 如下所示:
updateBlockInfo主要實現了“在檔案構造過程中,基於從datanode返回的block長度更新最後一個block的大小” For files under construction,update the last block size based on the length of the block from the datanode.“
callGetBlockLocations主要用於獲取block的資訊
可以看到返回的LocatedBlocks,他是一個連結串列List<LocatedBlock> blocks,每個blocks包含
namesystem是NameNode的一個成員變數,型別為FSNamesystem。 進裡面看看到底做了些什麼。
重點在於這個 getBlockLocationsInternal 繼續往下看,就可以看到這個函式的實現
1.3總結 可以看到,最終返回給使用者的是FSDataInputStream 通過client訪問namenode完成了整個”開啟檔案“的過程,我個人理解的開啟過程,其實就是從namenode蒐集block所對應的datanode、基本資訊等內容,以用於接下來的read。 二 檔案的讀取 2.1客戶端 在檔案讀取的時候,直接呼叫DFSDataInputStream.read
可以看到在獲取blockRange呼叫了getBlockRange(position, realLen)這一方法 從DFSClient.java,第1991行可以看到如下實現
read當中,還有一個比較關鍵的呼叫fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset); 在2291行找到了其實現,
此句,
呼叫了 BlockReader.newBlockReader,見DFSClient.java 1689行
2.2 DataNode 在其320行,有start的實現,重點關注399-419,有Socket Server初始化的過程
第128行開始
77行,關注run()
三 UML序列圖 開啟過程
讀取過程
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
statistics.incrementReadOps(1);
return new DFSClient.DFSDataInputStream(
dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
}
其中dfs為DistributedFileSystem的成員變數(型別為DFSClient)。 dfs.open(String src, int buffersize, boolean verifyChecksum, FileSystem.Statistics stats)這一方法,在
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum, FileSystem.Statistics stats ) throws IOException { checkOpen(); // Get block info from namenode return new DFSInputStream(src, buffersize, verifyChecksum); }
可以看到DFSInputStream被建立並返回。 在DFSInputStream建立過程中,建構函式在DFSClient.java的第1828-1835行, 如下所示:
DFSInputStream(String src, int buffersize, boolean verifyChecksum ) throws IOException { this.verifyChecksum = verifyChecksum; this.buffersize = buffersize; this.src = src; prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize); openInfo(); }
可以看到,openInfo被呼叫。具體程式碼可以在DFSClient.java的第1840-1861行, 如下所示:
synchronized void openInfo() throws IOException {
LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
if (newInfo == null) {
throw new FileNotFoundException("File does not exist: " + src);
}
// I think this check is not correct. A file could have been appended to
// between two calls to openInfo().
if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() &&
!newInfo.isUnderConstruction()) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
updateBlockInfo(newInfo);
this.locatedBlocks = newInfo;
this.currentNode = null;
}
updateBlockInfo主要實現了“在檔案構造過程中,基於從datanode返回的block長度更新最後一個block的大小” For files under construction,update the last block size based on the length of the block from the datanode.“
private void updateBlockInfo(LocatedBlocks newInfo) {
if (!serverSupportsHdfs200 || !newInfo.isUnderConstruction()
|| !(newInfo.locatedBlockCount() > 0)) {
return;
}
LocatedBlock last = newInfo.get(newInfo.locatedBlockCount() - 1);
boolean lastBlockInFile = (last.getStartOffset() + last.getBlockSize() == newInfo
.getFileLength());
if (!lastBlockInFile || last.getLocations().length <= 0) {
return;
}
ClientDatanodeProtocol primary = null;
DatanodeInfo primaryNode = last.getLocations()[0];
try {
primary = createClientDatanodeProtocolProxy(primaryNode, conf,
last.getBlock(), last.getBlockToken(), socketTimeout);
Block newBlock = primary.getBlockInfo(last.getBlock());
long newBlockSize = newBlock.getNumBytes();
long delta = newBlockSize - last.getBlockSize();
// if the size of the block on the datanode is different
// from what the NN knows about, the datanode wins!
last.getBlock().setNumBytes(newBlockSize);
long newlength = newInfo.getFileLength() + delta;
newInfo.setFileLength(newlength);
LOG.debug("DFSClient setting last block " + last + " to length "
+ newBlockSize + " filesize is now " + newInfo.getFileLength());
} catch (IOException e) {
if (e.getMessage().startsWith(
"java.io.IOException: java.lang.NoSuchMethodException: "
+ "org.apache.hadoop.hdfs.protocol"
+ ".ClientDatanodeProtocol.getBlockInfo")) {
// We're talking to a server that doesn't implement HDFS-200.
serverSupportsHdfs200 = false;
} else {
LOG.debug("DFSClient file " + src
+ " is being concurrently append to" + " but datanode "
+ primaryNode.getHostName() + " probably does not have block "
+ last.getBlock());
}
}
}
callGetBlockLocations主要用於獲取block的資訊
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
String src, long start, long length) throws IOException {
try {
return namenode.getBlockLocations(src, start, length);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class);
}
}
可以看到返回的LocatedBlocks,他是一個連結串列List<LocatedBlock> blocks,每個blocks包含
- Block b: block的資訊
- long offset: block在檔案中的偏移量
- DatanodeInfo[]locs: 位於哪些Datanode
public LocatedBlocks getBlockLocations(String src,
long offset,
long length) throws IOException {
myMetrics.incrNumGetBlockLocations();
return namesystem.getBlockLocations(getClientMachine(),
src, offset, length);
}
namesystem是NameNode的一個成員變數,型別為FSNamesystem。 進裡面看看到底做了些什麼。
public LocatedBlocks getBlockLocations(String src, long offset, long length,
boolean doAccessTime, boolean needBlockToken) throws IOException {
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.READ);
}
if (offset < 0) {
throw new IOException("Negative offset is not supported. File: " + src );
}
if (length < 0) {
throw new IOException("Negative length is not supported. File: " + src );
}
final LocatedBlocks ret = getBlockLocationsInternal(src,
offset, length, Integer.MAX_VALUE, doAccessTime, needBlockToken);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
"open", src, null, null);
}
return ret;
}
重點在於這個 getBlockLocationsInternal 繼續往下看,就可以看到這個函式的實現
private synchronized LocatedBlocks getBlockLocationsInternal(String src,
long offset,
long length,
int nrBlocksToReturn,
boolean doAccessTime,
boolean needBlockToken)
throws IOException {
INodeFile inode = dir.getFileINode(src);
if(inode == null) {
return null;
}
if (doAccessTime && isAccessTimeSupported()) {
dir.setTimes(src, inode, -1, now(), false);
}
//獲得此block的資訊
Block[] blocks = inode.getBlocks();
if (blocks == null) {
return null;
}
if (blocks.length == 0) {
return inode.createLocatedBlocks(new ArrayList<LocatedBlock>(blocks.length));
}
List<LocatedBlock> results;
results = new ArrayList<LocatedBlock>(blocks.length);
//從offset->offset+length,所涉及的blocks
int curBlk = 0;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
for (curBlk = 0; curBlk < nrBlocks; curBlk++) {
blkSize = blocks[curBlk].getNumBytes();
assert blkSize > 0 : "Block of size 0";
if (curPos + blkSize > offset) {
//沒超過block長度則curPos還指向當前block
break;
}
curPos += blkSize;
}
if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file
return null;
long endOff = offset + length;
do {
// get block locations
int numNodes = blocksMap.numNodes(blocks[curBlk]);
int numCorruptNodes = countNodes(blocks[curBlk]).corruptReplicas();
int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blocks[curBlk]);
if (numCorruptNodes != numCorruptReplicas) {
LOG.warn("Inconsistent number of corrupt replicas for " +
blocks[curBlk] + "blockMap has " + numCorruptNodes +
" but corrupt replicas map has " + numCorruptReplicas);
}
//找到block所對應的datanode
DatanodeDescriptor[] machineSet = null;
boolean blockCorrupt = false;
if (inode.isUnderConstruction() && curBlk == blocks.length - 1
&& blocksMap.numNodes(blocks[curBlk]) == 0) {
// get unfinished block locations
INodeFileUnderConstruction cons = (INodeFileUnderConstruction)inode;
machineSet = cons.getTargets();
blockCorrupt = false;
} else {
blockCorrupt = (numCorruptNodes == numNodes);
int numMachineSet = blockCorrupt ? numNodes :
(numNodes - numCorruptNodes);
machineSet = new DatanodeDescriptor[numMachineSet];
if (numMachineSet > 0) {
numNodes = 0;
for(Iterator<DatanodeDescriptor> it =
blocksMap.nodeIterator(blocks[curBlk]); it.hasNext();) {
DatanodeDescriptor dn = it.next();
boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blocks[curBlk], dn);
if (blockCorrupt || (!blockCorrupt && !replicaCorrupt))
machineSet[numNodes++] = dn;
}
}
}
//構造一個LocatedBlock
LocatedBlock b = new LocatedBlock(blocks[curBlk], machineSet, curPos,
blockCorrupt);
if(isAccessTokenEnabled && needBlockToken) {
b.setBlockToken(accessTokenHandler.generateToken(b.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.READ)));
}
//加入到results中
results.add(b);
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
&& curBlk < blocks.length
&& results.size() < nrBlocksToReturn);
//返回結果
return inode.createLocatedBlocks(results);
}
1.3總結 可以看到,最終返回給使用者的是FSDataInputStream 通過client訪問namenode完成了整個”開啟檔案“的過程,我個人理解的開啟過程,其實就是從namenode蒐集block所對應的datanode、基本資訊等內容,以用於接下來的read。 二 檔案的讀取 2.1客戶端 在檔案讀取的時候,直接呼叫DFSDataInputStream.read
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
// sanity checks
checkOpen();
if (closed) {
throw new IOException("Stream closed");
}
failures = 0;
long filelen = getFileLength();
if ((position < 0) || (position >= filelen)) {
return -1;
}
int realLen = length;
if ((position + length) > filelen) {
realLen = (int)(filelen - position);
}
// determine the block and byte range within the block
// corresponding to position and realLen
// 獲取從offset到offset+length內容的block列表
// 例如從100M開始,讀取長度為128M的資料
// ■64■128■192■256■320 .....
// 則應該讀取的為第2、3、4塊
List<LocatedBlock> blockRange = getBlockRange(position, realLen);
int remaining = realLen;
//例如,第二塊從36M(100-64)開始,第三塊64M全讀,第四塊到36M(228M-192M)結束
for (LocatedBlock blk : blockRange) {
long targetStart = position - blk.getStartOffset();
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
fetchBlockByteRange(blk, targetStart,
targetStart + bytesToRead - 1, buffer, offset);
remaining -= bytesToRead;
position += bytesToRead;
offset += bytesToRead;
}
assert remaining == 0 : "Wrong number of bytes read.";
if (stats != null) {
stats.incrementBytesRead(realLen);
}
return realLen;
}
可以看到在獲取blockRange呼叫了getBlockRange(position, realLen)這一方法 從DFSClient.java,第1991行可以看到如下實現
private synchronized List<LocatedBlock> getBlockRange(long offset,
long length)
throws IOException {
assert (locatedBlocks != null) : "locatedBlocks is null";
List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
// search cached blocks first
//首先從快取的locatedBlocks中查詢offset所在的block在快取連結串列中的位置
int blockIdx = locatedBlocks.findBlock(offset);
if (blockIdx < 0) { // block is not cached
blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
}
long remaining = length;
long curOff = offset;
while(remaining > 0) {
LocatedBlock blk = null;
//按照blockIdx的位置找到block
if(blockIdx < locatedBlocks.locatedBlockCount())
blk = locatedBlocks.get(blockIdx);
//如果block為空,則快取中沒有此block,則直接從NameNode中查詢這些block,並加入快取
if (blk == null || curOff < blk.getStartOffset()) {
LocatedBlocks newBlocks;
newBlocks = callGetBlockLocations(namenode, src, curOff, remaining);
locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
continue;
}
assert curOff >= blk.getStartOffset() : "Block not found";
//如果block找到,則放入結果集
blockRange.add(blk);
long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
remaining -= bytesRead;
curOff += bytesRead;
//取下一個block
blockIdx++;
}
return blockRange;
}
read當中,還有一個比較關鍵的呼叫fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1, buffer, offset); 在2291行找到了其實現,
private void fetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset) throws IOException {
//
// Connect to best DataNode for desired Block, with potential offset
//
Socket dn = null;
int refetchToken = 1; // only need to get a new access token once
while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
block = getBlockAt(block.getStartOffset(), false);
//選擇一個Datanode讀取資料
DNAddrPair retval = chooseDataNode(block);
DatanodeInfo chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
BlockReader reader = null;
int len = (int) (end - start + 1);
try {
Token<BlockTokenIdentifier> accessToken = block.getBlockToken();
// first try reading the block locally.
if (shouldTryShortCircuitRead(targetAddr)) {
try {
reader = getLocalBlockReader(conf, src, block.getBlock(),
accessToken, chosenNode, DFSClient.this.socketTimeout, start);
} catch (AccessControlException ex) {
LOG.warn("Short circuit access failed ", ex);
//Disable short circuit reads
shortCircuitLocalReads = false;
continue;
}
} else {
//如果本地沒有,
// go to the datanode
//建立Socket連線
dn = socketFactory.createSocket();
NetUtils.connect(dn, targetAddr, socketTimeout);
dn.setSoTimeout(socketTimeout);
//利用建立的Socket連結,生成一個reader負責從DataNode讀取資料
reader = BlockReader.newBlockReader(dn, src,
block.getBlock().getBlockId(), accessToken,
block.getBlock().getGenerationStamp(), start, len, buffersize,
verifyChecksum, clientName);
}
//讀取資料
int nread = reader.readAll(buf, offset, len);
if (nread != len) {
throw new IOException("truncated return from reader.read(): " +
"excpected " + len + ", got " + nread);
}
return;
} catch (ChecksumException e) {
LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
src + " at " + block.getBlock() + ":" +
e.getPos() + " from " + chosenNode.getName());
reportChecksumFailure(src, block.getBlock(), chosenNode);
} catch (IOException e) {
if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
refetchToken--;
fetchBlockAt(block.getStartOffset());
continue;
} else {
LOG.warn("Failed to connect to " + targetAddr + " for file " + src
+ " for block " + block.getBlock() + ":" + e);
if (LOG.isDebugEnabled()) {
LOG.debug("Connection failure ", e);
}
}
} finally {
IOUtils.closeStream(reader);
IOUtils.closeSocket(dn);
}
// Put chosen node into dead list, continue
//如果讀取失敗,則將此DataNode標記為失敗節點
addToDeadNodes(chosenNode);
}
}
此句,
reader = BlockReader.newBlockReader(dn, src,
block.getBlock().getBlockId(), accessToken,
block.getBlock().getGenerationStamp(), start, len, buffersize,
verifyChecksum, clientName);
呼叫了 BlockReader.newBlockReader,見DFSClient.java 1689行
public static BlockReader newBlockReader( Socket sock, String file,
long blockId,
Token<BlockTokenIdentifier> accessToken,
long genStamp,
long startOffset, long len,
int bufferSize, boolean verifyChecksum,
String clientName)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
//使用Socket建立寫入流,向DataNode傳送讀指令
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
//write the header.
out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
out.write( DataTransferProtocol.OP_READ_BLOCK );
out.writeLong( blockId );
out.writeLong( genStamp );
out.writeLong( startOffset );
out.writeLong( len );
Text.writeString(out, clientName);
accessToken.write(out);
out.flush();
//
// Get bytes in block, set streams
//
//使用Socket建立讀入流,用於從DataNode讀取資料
DataInputStream in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(sock),
bufferSize));
short status = in.readShort();
if (status != DataTransferProtocol.OP_STATUS_SUCCESS) {
if (status == DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN) {
throw new InvalidBlockTokenException(
"Got access token error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file
+ ", for block " + blockId + "_" + genStamp);
} else {
throw new IOException("Got error for OP_READ_BLOCK, self="
+ sock.getLocalSocketAddress() + ", remote="
+ sock.getRemoteSocketAddress() + ", for file " + file
+ ", for block " + blockId + "_" + genStamp);
}
}
DataChecksum checksum = DataChecksum.newDataChecksum( in );
//Warning when we get CHECKSUM_NULL?
// Read the first chunk offset.
long firstChunkOffset = in.readLong();
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
throw new IOException("BlockReader: error in first chunk offset (" +
firstChunkOffset + ") startOffset is " +
startOffset + " for file " + file);
}
//生成一個reader,主要包含讀入流,用於讀取資料
return new BlockReader( file, blockId, in, checksum, verifyChecksum,
startOffset, firstChunkOffset, sock );
}
2.2 DataNode 在其320行,有start的實現,重點關注399-419,有Socket Server初始化的過程
void startDataNode(Configuration conf,
AbstractList<File> dataDirs, SecureResources resources
) throws IOException
{
........
// find free port or use privileged port provide
ServerSocket ss;
if(secureResources == null) {
//建立ServerSocket
ss = (socketWriteTimeout > 0) ?
ServerSocketChannel.open().socket() : new ServerSocket();
Server.bind(ss, socAddr, 0);
} else {
ss = resources.getStreamingSocket();
}
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
// adjust machine name with the actual port
tmpPort = ss.getLocalPort();
selfAddr = new InetSocketAddress(ss.getInetAddress().getHostAddress(),
tmpPort);
this.dnRegistration.setName(machineName + ":" + tmpPort);
LOG.info("Opened info server at " + tmpPort);
this.threadGroup = new ThreadGroup("dataXceiverServer");
this.dataXceiverServer = new Daemon(threadGroup,
new DataXceiverServer(ss, conf, this));
this.threadGroup.setDaemon(true); // auto destroy when empty
......
}
第128行開始
public void run() {
while (datanode.shouldRun) {
try {
//接受客戶端的連線
Socket s = ss.accept();
s.setTcpNoDelay(true);
//生成一個執行緒DataXceiver來對建立的連結提供服務
new Daemon(datanode.threadGroup,
new DataXceiver(s, datanode, this)).start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (AsynchronousCloseException ace) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer:"
+ StringUtils.stringifyException(ace));
datanode.shouldRun = false;
} catch (IOException ie) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer: IOException due to:"
+ StringUtils.stringifyException(ie));
} catch (Throwable te) {
LOG.error(datanode.dnRegistration + ":DataXceiveServer: Exiting due to:"
+ StringUtils.stringifyException(te));
datanode.shouldRun = false;
}
}
try {
ss.close();
} catch (IOException ie) {
LOG.warn(datanode.dnRegistration + ":DataXceiveServer: Close exception due to: "
+ StringUtils.stringifyException(ie));
}
LOG.info("Exiting DataXceiveServer");
}
77行,關注run()
public void run() {
DataInputStream in=null;
try {
//建立一個輸入流,讀取客戶端傳送的指令
in = new DataInputStream(
new BufferedInputStream(NetUtils.getInputStream(s),
SMALL_BUFFER_SIZE));
short version = in.readShort();
if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
throw new IOException( "Version Mismatch" );
}
boolean local = s.getInetAddress().equals(s.getLocalAddress());
byte op = in.readByte();
// Make sure the xciver count is not exceeded
int curXceiverCount = datanode.getXceiverCount();
if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
throw new IOException("xceiverCount " + curXceiverCount
+ " exceeds the limit of concurrent xcievers "
+ dataXceiverServer.maxXceiverCount);
}
long startTime = DataNode.now();
switch ( op ) {
//讀取
case DataTransferProtocol.OP_READ_BLOCK:
readBlock( in );
datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);
if (local)
datanode.myMetrics.incrReadsFromLocalClient();
else
datanode.myMetrics.incrReadsFromRemoteClient();
break;
case DataTransferProtocol.OP_WRITE_BLOCK:
writeBlock( in );
datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime);
if (local)
datanode.myMetrics.incrWritesFromLocalClient();
else
datanode.myMetrics.incrWritesFromRemoteClient();
break;
case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
replaceBlock(in);
datanode.myMetrics.addReplaceBlockOp(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_COPY_BLOCK:
// for balancing purpose; send to a proxy source
copyBlock(in);
datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime);
break;
case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
getBlockChecksum(in);
datanode.myMetrics.addBlockChecksumOp(DataNode.now() - startTime);
break;
default:
throw new IOException("Unknown opcode " + op + " in data stream");
}
} catch (Throwable t) {
LOG.error(datanode.dnRegistration + ":DataXceiver",t);
} finally {
LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
+ datanode.getXceiverCount());
IOUtils.closeStream(in);
IOUtils.closeSocket(s);
dataXceiverServer.childSockets.remove(s);
}
}
private void readBlock(DataInputStream in) throws IOException {
//
// Read in the header
//
long blockId = in.readLong();
Block block = new Block( blockId, 0 , in.readLong());
long startOffset = in.readLong();
long length = in.readLong();
String clientName = Text.readString(in);
//建立一個寫入流,用於向客戶端寫資料
Token<BlockTokenIdentifier> accessToken = new Token<BlockTokenIdentifier>();
accessToken.readFields(in);
OutputStream baseStream = NetUtils.getOutputStream(s,
datanode.socketWriteTimeout);
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
if (datanode.isBlockTokenEnabled) {
try {
datanode.blockTokenSecretManager.checkAccess(accessToken, null, block,
BlockTokenSecretManager.AccessMode.READ);
} catch (InvalidToken e) {
try {
out.writeShort(DataTransferProtocol.OP_STATUS_ERROR_ACCESS_TOKEN);
out.flush();
throw new IOException("Access token verification failed, for client "
+ remoteAddress + " for OP_READ_BLOCK for block " + block);
} finally {
IOUtils.closeStream(out);
}
}
}
// send the block
BlockSender blockSender = null;
final String clientTraceFmt =
clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
"%d", "HDFS_READ", clientName, "%d",
datanode.dnRegistration.getStorageID(), block, "%d")
: datanode.dnRegistration + " Served block " + block + " to " +
s.getInetAddress();
try {
try {
//生成BlockSender用於讀取本地的block的資料,併發送給客戶端
//BlockSender有一個成員變數InputStream blockIn用於讀取本地block的資料
blockSender = new BlockSender(block, startOffset, length,
true, true, false, datanode, clientTraceFmt);
} catch(IOException e) {
out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
throw e;
}
//向客戶端寫入資料
out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
long read = blockSender.sendBlock(out, baseStream, null); // send data
if (blockSender.isBlockReadFully()) {
// See if client verification succeeded.
// This is an optional response from client.
try {
if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK &&
datanode.blockScanner != null) {
datanode.blockScanner.verifiedByClient(block);
}
} catch (IOException ignored) {}
}
datanode.myMetrics.incrBytesRead((int) read);
datanode.myMetrics.incrBlocksRead();
} catch ( SocketException ignored ) {
// Its ok for remote side to close the connection anytime.
datanode.myMetrics.incrBlocksRead();
} catch ( IOException ioe ) {
/* What exactly should we do here?
* Earlier version shutdown() datanode if there is disk error.
*/
LOG.warn(datanode.dnRegistration + ":Got exception while serving " +
block + " to " +
s.getInetAddress() + ":\n" +
StringUtils.stringifyException(ioe) );
throw ioe;
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(blockSender);
}
}
三 UML序列圖 開啟過程
讀取過程