tomcat原始碼解讀四 tomcat中的processer
Processor是一個介面,針對於不同協議下具有不同的具體實現類,其實現類的具體功能是處理http請求,主要是對協議進行解析,狀態處理以及響應。然後起一箇中間作用轉發到 Adater,下面是其類的關係圖
其實現類中我們常用的http協議,所以一般是左邊的部分,用紅線標註
1.1 迴圈佇列
protected static class RecycledProcessors<P extends Processor<S>, S> extends SynchronizedStack<Processor<S>> { private final transient AbstractConnectionHandler<S,P> handler; protected final AtomicInteger size = new AtomicInteger(0); public RecycledProcessors(AbstractConnectionHandler<S,P> handler) { this.handler = handler; } @SuppressWarnings("sync-override") // Size may exceed cache size a bit @Override public boolean push(Processor<S> processor) { //獲取Processor能夠快取的大小 int cacheSize = handler.getProtocol().getProcessorCache(); boolean offer = cacheSize == -1 ? true : size.get() < cacheSize; //向棧中壓入當前processor boolean result = false; if (offer) { result = super.push(processor); if (result) { size.incrementAndGet(); } } //取消當前processor例項的JMX if (!result) handler.unregister(processor); return result; } @SuppressWarnings("sync-override") // OK if size is too big briefly @Override public Processor<S> pop() { Processor<S> result = super.pop(); if (result != null) { size.decrementAndGet(); } return result; } @Override public synchronized void clear() { Processor<S> next = pop(); while (next != null) { handler.unregister(next); next = pop(); } super.clear(); size.set(0); } }
在講述Processor的獲取以及處理過程之前先看一個類,姑且命名為迴圈佇列, 它主要是繼承了SynchronizedStack這個棧(tomcat自己實現)裡面實現了進棧出棧兩種方法。
1.1 Processor的建立
根據棧中執行的流程可以看出呼叫的是協議控制代碼的抽象類中的process方法,所以針對於四種模式其實現過程大致相同,具體程式碼如下:
public SocketState process(SocketWrapper<S> wrapper, SocketStatus status) { //如果socketWrapper為空則證明不存在socket則直接將狀態設定為CLOSED if (wrapper == null) { return SocketState.CLOSED; } //獲取當前SocketWrapper例項對應的NIO通道 S socket = wrapper.getSocket(); if (socket == null) { //什麼也不做 socket已經關閉 return SocketState.CLOSED; } /** * 從connections中根據socket獲取Processor,如果沒有則在下面建立 connections控制代碼型別Map<S,Processor<S>> * 在以下情況下connections中存在值 * 1.websocket中 * 2.非同步servlet * 3.傳送檔案 * */ Processor<S> processor = connections.get(socket); if (status == SocketStatus.DISCONNECT && processor == null) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. return SocketState.CLOSED; } wrapper.setAsync(false); //標記當前執行緒是否是容器執行緒 set則是容器執行緒 ContainerThreadMarker.set(); /** * * 建立一個Http11NioProcessor 例項裡面構造了request 和response成員變數 * 各封裝了一個InternalNioInputBuffer例項 * 其中request中封裝了成員屬性名inputBuffer * response中封裝了成員屬性名outputBuffer * */ try { if (processor == null) { processor = recycledProcessors.pop(); } if (processor == null) { processor = createProcessor(); } initSsl(wrapper, processor); SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches != null) { // Associate the processor with the connection as // these calls may result in a nested call to process() connections.put(socket, processor); DispatchType nextDispatch = dispatches.next(); if (processor.isUpgrade()) { state = processor.upgradeDispatch( nextDispatch.getSocketStatus()); } else { state = processor.asyncDispatch( nextDispatch.getSocketStatus()); } } else if (processor.isComet()) { state = processor.event(status); } else if (processor.isUpgrade()) { state = processor.upgradeDispatch(status); } else if (status == SocketStatus.DISCONNECT) { // Comet and upgrade need to see DISCONNECT but the // others don't. NO-OP and let socket close. } else if (processor.isAsync() || state == SocketState.ASYNC_END) { state = processor.asyncDispatch(status); if (state == SocketState.OPEN) { // release() won't get called so in case this request // takes a long time to process, remove the socket from // the waiting requests now else the async timeout will // fire getProtocol().endpoint.removeWaitingRequest(wrapper); // There may be pipe-lined data to read. If the data // isn't processed now, execution will exit this // loop and call release() which will recycle the // processor (and input buffer) deleting any // pipe-lined data. To avoid this, process it now. state = processor.process(wrapper); } } else if (status == SocketStatus.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } else { //這個是在第一次請求的時候執行 state = processor.process(wrapper); } //根據非同步asyncStateMachine的狀態設定Socket的狀態 if (state != SocketState.CLOSED && processor.isAsync()) { state = processor.asyncPostProcess(); } if (state == SocketState.UPGRADING) { // Get the HTTP upgrade handler UpgradeToken upgradeToken = processor.getUpgradeToken(); HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); // Retrieve leftover input ByteBuffer leftoverInput = processor.getLeftoverInput(); // Release the Http11 processor to be re-used release(wrapper, processor, false, false); // Create the upgrade processor processor = createUpgradeProcessor( wrapper, leftoverInput, upgradeToken); // Mark the connection as upgraded wrapper.setUpgraded(true); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. if (upgradeToken.getInstanceManager() == null) { httpUpgradeHandler.init((WebConnection) processor); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.init((WebConnection) processor); } finally { upgradeToken.getContextBind().unbind(false, oldCL); } } } if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + wrapper + "], Status in: [" + status + "], State out: [" + state + "]"); } if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. dispatches = wrapper.getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || state == SocketState.UPGRADING || dispatches != null && state != SocketState.CLOSED); if (state == SocketState.LONG) { // In the middle of processing a request/response. Keep the // socket associated with the processor. Exact requirements // depend on type of long poll //非同步在第一次處理的時候會將其設定到當前connections中去 connections.put(socket, processor); longPoll(wrapper, processor); } else if (state == SocketState.OPEN) { // In keep-alive but between requests. OK to recycle // processor. Continue to poll for the next request. connections.remove(socket); release(wrapper, processor, false, true); } else if (state == SocketState.SENDFILE) { // Sendfile in progress. If it fails, the socket will be // closed. If it works, the socket either be added to the // poller (or equivalent) to await more data or processed // if there are any pipe-lined requests remaining. connections.put(socket, processor); } else if (state == SocketState.UPGRADED) { // Don't add sockets back to the poller if this was a // non-blocking write otherwise the poller may trigger // multiple read events which may lead to thread starvation // in the connector. The write() method will add this socket // to the poller if necessary. if (status != SocketStatus.OPEN_WRITE) { longPoll(wrapper, processor); } } else { // Connection closed. OK to recycle the processor. Upgrade // processors are not recycled. connections.remove(socket); if (processor.isUpgrade()) { UpgradeToken upgradeToken = processor.getUpgradeToken(); HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); InstanceManager instanceManager = upgradeToken.getInstanceManager(); if (instanceManager == null) { httpUpgradeHandler.destroy(); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.destroy(); } finally { try { instanceManager.destroyInstance(httpUpgradeHandler); } catch (Throwable e) { ExceptionUtils.handleThrowable(e); getLog().error(sm.getString("abstractConnectionHandler.error"), e); } upgradeToken.getContextBind().unbind(false, oldCL); } } } else { release(wrapper, processor, true, false); } } return state; } catch(java.net.SocketException e) { // SocketExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.socketexception.debug"), e); } catch (java.io.IOException e) { // IOExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.ioexception.debug"), e); } // Future developers: if you discover any other // rare-but-nonfatal exceptions, catch them here, and log as // above. catch (Throwable e) { ExceptionUtils.handleThrowable(e); // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. getLog().error( sm.getString("abstractConnectionHandler.error"), e); } finally { ContainerThreadMarker.clear(); } // Make sure socket/processor is removed from the list of current // connections connections.remove(socket); // Don't try to add upgrade processors back into the pool if (processor !=null && !processor.isUpgrade()) { release(wrapper, processor, true, false); } return SocketState.CLOSED; }
從程式碼中可以看出獲取Processor共經過三種途徑,首先在connections這個map根據socket找到對應的Processor例項,也許你會有疑惑socket為什麼會相同,目前我知道的有基於長連線和Upgrade來實現的socket,這樣就有效的保留其中的協議狀態,以及部分請求資料。如果從其中並沒有獲取則在迴圈佇列中獲取(下文講述迴圈佇列),這相當於從棧中獲取元素,這是因為當一個例項化後的Processor處理完之後,並不會回收,而是釋放存入棧中供下次來可以直接進行使用,如果棧中不存在則自己再例項化一個。由這種方式可以看出其例項化跟瀏覽器的請求沒有多大關係,在一次會話中可能使用不同的,在不同會話中也可能使用相同的Processor
1.3 Processor的釋放
在當前socket處理完之後,會將Processor給釋放,在這裡將其部分控制代碼給重置之後,然後就壓入迴圈佇列供下次使用,其具體處理過程在BIO NIO 和AIO中有所出入
protected abstract void release(SocketWrapper<S> socket,
Processor<S> processor, boolean socketClosing,
boolean addToPoller);