Tomcat原始碼分析之:ServletOutputStream的實現
貌似很久都沒有寫部落格了,tomcat8的程式碼已經看了很多,主體部分的程式碼也都看得差不多了,發現在tomcat8中已經完全支援非阻塞的方式接收以及傳送資料了。。。。但是比較遺憾的是,以前遺留下來的太多的老程式碼都不支援這種新的方式來發送資料。。。木有辦法。。。
這裡來看看Tomcat中是如何實現ServletOutputStream的吧。。。。
在具體的來看它之前,這裡先來一張圖來描述一下Tomcat的資料傳送時候的流動。。。
這張圖形已經比較好的展現成了Tomcat中對IO這一塊封裝的層次關係吧,首先是最上層的
ServletOutputStream物件,它是使用者程式碼可以接觸到的物件,它自帶了自己的OutputBuffer,這裡寫資料其實是通過這個這個buffer完成的,資料是寫到這個OutputBuffer裡面
接下來的一層就是Tomcat內部的Response型別了,每一個ServletResponse物件都對應著一個唯一的Tomcat內部的response型別,當然了,他也有自己的buffer,InternalNioOutputBuffer型別的物件,這裡上層ServletOutputStream寫資料將會流動到這裡。。。
最下層就是與channel關聯的部分了,它也有自己的buffer,這裡一般就是java類庫中的niobuffer,上層的資料將會流動到這裡。。。
最後才是將資料通過channel傳送出去。。。
嗯,雖然好像層次稍微多了些,而且剛剛開始看程式碼就覺得稍微有點繁瑣。。不過當將這個層次關係理清楚了之後還是蠻簡單的。。。
好啦,接下來來看程式碼了。。。
這裡就通過一次write來跟蹤整個程式碼的執行軌跡吧,首先來看看CoyoteOutputStream型別的write方法:
//傳送一個byte陣列的一部分資料 public void write(byte[] b, int off, int len) throws IOException { boolean nonBlocking = checkNonBlockingWrite(); //判斷是否支援非阻塞的傳送資料,這裡判斷的標準其實是使用者程式碼是否加入了WriteListener ob.write(b, off, len); //將資料寫到outputbuffer裡面 if (nonBlocking) { checkRegisterForWrite(); //如果是非阻塞的傳送資料的話,需要確保channel有註冊寫事件 } }
這裡首先要判斷是否支援非阻塞的傳送資料,這裡就不細講這部分的類容了,,,來看看這裡的outputbuffer的write行為吧:
//寫入一個位元組陣列的一部分
public void write(byte b[], int off, int len) throws IOException {
if (suspended) {
return;
}
writeBytes(b, off, len);
}
private void writeBytes(byte b[], int off, int len)
throws IOException {
if (closed) { //如果已經關閉了,那麼直接返回吧
return;
}
//這裡其實是寫到buffer裡,如果資料過大,也有可能呼叫realWriteBytes方法真正的呼叫底層寫資料
bb.append(b, off, len);
bytesWritten += len;
// if called from within flush(), then immediately flush
// remaining bytes
if (doFlush) {
bb.flushBuffer();
}
}
這裡其實就是將資料直接放到當年的bytechunk就好了。。。但是這裡如果資料比較多的話,會將資料直接寫到下一層,也就是tomcat內建的response。。。來看看bytechunk的append方法吧:
//加入一個位元組陣列的一部分資料
public void append( byte src[], int off, int len )
throws IOException
{
// will grow, up to limit
makeSpace( len ); //確保有這麼多空間可以寫
// if we don't have limit: makeSpace can grow as it wants
if( limit < 0 ) { //表示沒有空間限制
// assert: makeSpace made enough space
System.arraycopy( src, off, buff, end, len ); //將資料複製過來
end+=len; //將end偏移加上len
return;
}
// Optimize on a common case.
// If the buffer is empty and the source is going to fill up all the
// space in buffer, may as well write it directly to the output,
// and avoid an extra copy
//如果一次就填滿了,那麼還是寫出去好了
if ( len == limit && end == start && out != null ) {
out.realWriteBytes( src, off, len ); //因為要寫的資料比較大,所以直接寫到更下層去
return;
}
// if we have limit and we're below
if( len <= limit - end ) { //表示還有足夠的空間可以寫資料
// makeSpace will grow the buffer to the limit,
// so we have space
System.arraycopy( src, off, buff, end, len );
end+=len;
return;
}
// need more space than we can afford, need to flush
// buffer
// the buffer is already at ( or bigger than ) limit
// We chunk the data into slices fitting in the buffer limit, although
// if the data is written directly if it doesn't fit
//程式碼執行到這裡,說明空間不夠了
int avail=limit-end; //還剩下多大的空間可以寫資料
System.arraycopy(src, off, buff, end, avail);
end += avail;
flushBuffer();
int remain = len - avail;
while (remain > (limit - end)) { //不斷的嘗試寫資料到下面去
out.realWriteBytes( src, (off + len) - remain, limit - end );
remain = remain - (limit - end);
}
System.arraycopy(src, (off + len) - remain, buff, end, remain);
end += remain;
}
這裡可以看到realWriteBytes方法,它其實就是外面的outputbuffer定義的方法,來看看吧:
//這個才是真正的呼叫底層傳送資料,其實又是呼叫tomcat的response來寫資料
public void realWriteBytes(byte buf[], int off, int cnt)
throws IOException {
if (closed) {
return;
}
if (coyoteResponse == null) {
return;
}
// If we really have something to write
if (cnt > 0) {
// real write to the adapter
outputChunk.setBytes(buf, off, cnt); //設定outputchunk
try {
coyoteResponse.doWrite(outputChunk); //通過tomcat的response來寫資料,其實是寫到httpprocessor的buffer裡面去了
} catch (IOException e) {
// An IOException on a write is almost always due to
// the remote client aborting the request. Wrap this
// so that it can be handled better by the error dispatcher.
throw new ClientAbortException(e);
}
}
}
嗯,這裡就與上面的層次對應上了吧,其實就是寫到tomcat內建的response。。好了,接下來繼續。。
//在servlet的outputStream可能會呼叫這個方法來寫資料
public void doWrite(ByteChunk chunk/*byte buffer[], int pos, int count*/)
throws IOException
{
outputBuffer.doWrite(chunk, this); //呼叫在httpprocessor裡面建立的outputbuffer來寫資料,這裡會將資料寫到niochannel的buffer,然後最終傳送出去
contentWritten+=chunk.getLength(); //標記已經發送的資料量的大小
}
嗯,這裡其實是寫到internalNiobuffer裡去。。。。繼續看吧:
public int doWrite(ByteChunk chunk, Response res) throws IOException {
int len = chunk.getLength();
int start = chunk.getStart();
byte[] b = chunk.getBuffer();
addToBB(b, start, len);
byteCount += chunk.getLength();
return chunk.getLength();
}
嗯,沒啥意思,繼續:
private synchronized void addToBB(byte[] buf, int offset, int length)
throws IOException {
if (length == 0) return;
// Try to flush any data in the socket's write buffer first
//首先嚐試先將資料傳送出去
boolean dataLeft = flushBuffer(isBlocking());
// Keep writing until all the data is written or a non-blocking write
// leaves data in the buffer
//這裡只有在緩衝區裡面已經沒有資料了才繼續傳送
while (!dataLeft && length > 0) {
//首先將要傳送的資料copy到niochanel的傳送buffer裡面去
int thisTime = transfer(buf,offset,length,socket.getBufHandler().getWriteBuffer());
length = length - thisTime; //計算還剩下多少位元組沒有寫到niochannel的buffer裡面,其實這裡也就當做將資料轉移到了niochannel的buffer就算是寫出去了
offset = offset + thisTime; //這裡用於調整偏移量
//這裡呼叫writeToSocket方法將niochannel的buffer的裡面的資料通過socket寫出去
int written = writeToSocket(socket.getBufHandler().getWriteBuffer(),
isBlocking(), true); //如果在tomcat的response裡面有writelistener的話,可以非同步的寫
if (written == 0) { //都沒有寫出去位元組
dataLeft = true;
} else {
dataLeft = flushBuffer(isBlocking()); //flush一下,看一下是否還會有資料剩餘
}
}
NioEndpoint.KeyAttachment ka = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
if (ka != null) ka.access();//prevent timeouts for just doing client writes
if (!isBlocking() && length > 0) { //在非阻塞的傳送中,如果實在傳送不出去,需要儲存在額外的buffer裡面
// Remaining data must be buffered
addToBuffers(buf, offset, length);
}
}
這裡其實主要是呼叫flushBuffer方法,將資料傳給下層的niochannel,而且可以看到對於過多的資料這裡還會 做一層快取。。。。
//這裡其實是flush niochannel的buffer
protected boolean flushBuffer(boolean block) throws IOException {
//prevent timeout for async,
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
if (key != null) {
NioEndpoint.KeyAttachment attach = (NioEndpoint.KeyAttachment) key.attachment();
attach.access();
}
boolean dataLeft = hasMoreDataToFlush(); //其實這裡是判斷niochannel的buffer裡面是否還有資料要寫
//write to the socket, if there is anything to write
if (dataLeft) { //如果niochannel的buffer裡面還有資料傳送,那麼繼續寫
writeToSocket(socket.getBufHandler().getWriteBuffer(),block, !flipped);
}
dataLeft = hasMoreDataToFlush();
//這裡如果niochannel的buffer裡面的資料已經發送完了,那麼將將以前緩衝的資料再發送出去
if (!dataLeft && bufferedWrites.size() > 0) {
Iterator<ByteBufferHolder> bufIter = bufferedWrites.iterator(); //遍歷待發送的資料
while (!hasMoreDataToFlush() && bufIter.hasNext()) {
ByteBufferHolder buffer = bufIter.next();
buffer.flip();
while (!hasMoreDataToFlush() && buffer.getBuf().remaining()>0) {
transfer(buffer.getBuf(), socket.getBufHandler().getWriteBuffer());
if (buffer.getBuf().remaining() == 0) { //如果當前buffer裡面的所有資料都已經轉移到了niochannel的buffer裡面,那麼可以將這個buffer移除了
bufIter.remove();
}
writeToSocket(socket.getBufHandler().getWriteBuffer(),block, true);
//here we must break if we didn't finish the write
}
}
}
return hasMoreDataToFlush();
}
嗯,這裡其實主要是呼叫writeToSocket方法。。。來看看吧:
//這裡其實呼叫socket來寫資料
private synchronized int writeToSocket(ByteBuffer bytebuffer, boolean block, boolean flip) throws IOException {
if ( flip ) {
bytebuffer.flip();
flipped = true;
}
int written = 0;
NioEndpoint.KeyAttachment att = (NioEndpoint.KeyAttachment)socket.getAttachment(false);
if ( att == null ) throw new IOException("Key must be cancelled");
long writeTimeout = att.getWriteTimeout();
Selector selector = null;
try {
selector = pool.get();
} catch ( IOException x ) {
//ignore
}
try {
written = pool.write(bytebuffer, socket, selector, writeTimeout, block);
//make sure we are flushed
do {
//對於niochanel,這個flush方法其實是沒用的
if (socket.flush(true,selector,writeTimeout)) break;
}while ( true );
}finally {
if ( selector != null ) {
pool.put(selector);
}
}
if ( block || bytebuffer.remaining()==0) {
//blocking writes must empty the buffer
//and if remaining==0 then we did empty it
bytebuffer.clear();
flipped = false;
}
// If there is data left in the buffer the socket will be registered for
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.
return written;
}
這裡因為涉及到了一些阻塞的或者非阻塞的傳送資料。。所以可能會用到selector。。。
public int write(ByteBuffer buf, NioChannel socket, Selector selector,
long writeTimeout, boolean block) throws IOException {
if ( SHARED && block ) { //對於寫資料,一般都是這裡
return blockingSelector.write(buf,socket,writeTimeout);
}
//但是如果有outputstream的listener的話,可以採用非阻塞的方式來發送大量的資料
SelectionKey key = null;
int written = 0;
boolean timedout = false;
int keycount = 1; //assume we can write //假裝剛開始是可以寫的
long time = System.currentTimeMillis(); //start the timeout timer
try {
while ( (!timedout) && buf.hasRemaining() ) {
int cnt = 0;
if ( keycount > 0 ) { //only write if we were registered for a write
cnt = socket.write(buf); //write the data
if (cnt == -1) throw new EOFException(); //出錯了
written += cnt;
if (cnt > 0) {
time = System.currentTimeMillis(); //reset our timeout timer
continue; //we successfully wrote, try again without a selector
}
if (cnt==0 && (!block)) { //這裡對於非阻塞的寫,就直接返回了
break; //don't block
}
}
if ( selector != null ) {
//register OP_WRITE to the selector
if (key==null) key = socket.getIOChannel().register(selector, SelectionKey.OP_WRITE);
else key.interestOps(SelectionKey.OP_WRITE);
if (writeTimeout==0) {
timedout = buf.hasRemaining();
} else if (writeTimeout<0) {
keycount = selector.select();
} else {
keycount = selector.select(writeTimeout);
}
}
if (writeTimeout > 0 && (selector == null || keycount == 0) ) timedout = (System.currentTimeMillis()-time)>=writeTimeout;
}//while
if ( timedout ) throw new SocketTimeoutException();
} finally {
if (key != null) {
key.cancel();
if (selector != null) selector.selectNow();//removes the key from this selector
}
}
return written;
}
這裡如果就直接呼叫niochannel來發送資料了。。不過其實這裡還會涉及到將資料轉移到niochannel的buffer。。然後才傳送資料。。。
到這裡整個資料的流動層次對照著上面的圖形應該就算是 比較明白了吧。。。。