類 workbooks 的 open 方法無效_Tomcat NIO(10)IO執行緒關鍵類
阿新 • • 發佈:2020-12-19
在上一篇文章裡我們主要介紹了tomcat io執行緒的 overall 呼叫流程以及關鍵類SocketProcessor 和 ConnectionHandler 的核心邏輯總結,這裡我們主要來介紹剩餘其它的核心類 AbstractProcessorLight,Http11Processor,CoyoteAdapter。
AbstractProcessorLight核心邏輯如下:
public SocketState process(SocketWrapperBase> socketWrapper, SocketEvent status) throws IOException { SocketState state = SocketState.CLOSED; Iterator dispatches = null; do { if (dispatches != null) { DispatchType nextDispatch = dispatches.next(); state = dispatch(nextDispatch.getSocketStatus()); } else if (status == SocketEvent.DISCONNECT) { // Do nothing here, just wait for it to get recycled } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); if (state == SocketState.OPEN) { // There may be pipe-lined data to read. If the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. To avoid this, // process it now. state = service(socketWrapper); } } else if (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } else if (status == SocketEvent.OPEN_READ){ state = service(socketWrapper); } else { // Default to closing the socket if the SocketEvent passed in // is not consistent with the current state of the Processor state = SocketState.CLOSED; } if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]"); } if (state != SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]"); } } if (dispatches == null || !dispatches.hasNext()) { dispatches = getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED); return state;}
- 該類的核心方法為 process() ,會被在上一篇文章之中介紹的 ConnectionHandler 物件例項的 process() 方法調所用。
- 該方法根據不同的 socket 事件和是否採用非同步處理來進行不同的呼叫,返回期望的 SocketState 狀態,這裡我們只對非非同步的正常呼叫介紹。
- 對於非非同步的正常呼叫下,SocketEvent 為 OPEN_READ ,進入Http11Processor 例項的 service() 方法。
對於其他未知的 SocketEvent 事件來說,返回給 ConnectionHandler 例項的SocketState 為 CLOSED 。根據以前文章,這樣的結果會被給 SocketProcessor 關閉原始 socket 。
Http11Processor的核心程式碼邏輯如下:
public AbstractProcessor(Adapter adapter) {this(adapter,newRequest(),newResponse());}//Http11Processorpublic Http11Processor(AbstractHttp11Protocol> protocol, Adapter adapter) { super(adapter); this.protocol = protocol; httpParser = new HttpParser(protocol.getRelaxedPathChars(), protocol.getRelaxedQueryChars()); inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpHeaderSize(), protocol.getRejectIllegalHeaderName(), httpParser); request.setInputBuffer(inputBuffer); outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize()); response.setOutputBuffer(outputBuffer); // Create and add the identity filters. inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize())); outputBuffer.addFilter(new IdentityOutputFilter()); // Create and add the chunked filters. inputBuffer.addFilter(new ChunkedInputFilter(protocol.getMaxTrailerSize(), protocol.getAllowedTrailerHeadersInternal(), protocol.getMaxExtensionSize(), protocol.getMaxSwallowSize())); outputBuffer.addFilter(new ChunkedOutputFilter()); // Create and add the void filters. inputBuffer.addFilter(new VoidInputFilter()); outputBuffer.addFilter(new VoidOutputFilter()); // Create and add buffered input filter inputBuffer.addFilter(new BufferedInputFilter()); // Create and add the chunked filters. //inputBuffer.addFilter(new GzipInputFilter()); outputBuffer.addFilter(new GzipOutputFilter()); pluggableFilterIndex = inputBuffer.getFilters().length;}public SocketState service(SocketWrapperBase> socketWrapper) throws IOException { RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); setSocketWrapper(socketWrapper); inputBuffer.init(socketWrapper); outputBuffer.init(socketWrapper); keepAlive = true; openSocket = false; readComplete = true; boolean keptAlive = false; SendfileState sendfileState = SendfileState.DONE; while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !protocol.isPaused()) { // Parsing the request header try {if(!inputBuffer.parseRequestLine(keptAlive,protocol.getConnectionTimeout(), protocol.getKeepAliveTimeout())){ if (inputBuffer.getParsingRequestLinePhase() == -1) { return SocketState.UPGRADING; } else if (handleIncompleteRequestLineRead()) { break; } } if (protocol.isPaused()) { // 503 - Service unavailable response.setStatus(503); setErrorState(ErrorState.CLOSE_CLEAN, null); } else { keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount()); if (!inputBuffer.parseHeaders()) { // We've read part of the request, don't recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!protocol.getDisableUploadTimeout()) { socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout()); } } } catch (IOException e) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.header.parse"), e); } setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); break; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); UserDataHelper.Mode logMode = userDataHelper.getNextMode(); if (logMode != null) { String message = sm.getString("http11processor.header.parse"); switch (logMode) { case INFO_THEN_DEBUG: message += sm.getString("http11processor.fallToDebug"); case INFO: log.info(message, t); break; case DEBUG: log.debug(message, t); } } // 400 - Bad Request response.setStatus(400); setErrorState(ErrorState.CLOSE_CLEAN, t); } // Has an upgrade been requested? Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection"); boolean foundUpgrade = false; while (connectionValues.hasMoreElements() && !foundUpgrade) { foundUpgrade = connectionValues.nextElement().toLowerCase( Locale.ENGLISH).contains("upgrade"); } if (foundUpgrade) { // Check the protocol String requestedProtocol = request.getHeader("Upgrade"); UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol); if (upgradeProtocol != null) { if (upgradeProtocol.accept(request)) { response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); response.setHeader("Connection", "Upgrade"); response.setHeader("Upgrade", requestedProtocol); action(ActionCode.CLOSE, null); getAdapter().log(request, response, 0); InternalHttpUpgradeHandler upgradeHandler = upgradeProtocol.getInternalUpgradeHandler( socketWrapper, getAdapter(), cloneRequest(request)); UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null); action(ActionCode.UPGRADE, upgradeToken); return SocketState.UPGRADING; } } } if (getErrorState().isIoAllowed()) { // Setting up filters, and parse some request headers rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { prepareRequest(); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.request.prepare"), t); } // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); } } int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests(); if (maxKeepAliveRequests == 1) { keepAlive = false; } else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0) { keepAlive = false; } // Process the request in the adapter if (getErrorState().isIoAllowed()) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); getAdapter().service(request, response); // Handle when the response was committed before a serious // error occurred. Throwing a ServletException should both // set the status to 500 and set the errorException. // If we fail here, then the response is likely already // committed, so we can't try and set headers. if(keepAlive && !getErrorState().isError() && !isAsync() && statusDropsConnection(response.getStatus())) { setErrorState(ErrorState.CLOSE_CLEAN, null); } } catch (InterruptedIOException e) { setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); } catch (HeadersTooLargeException e) { log.error(sm.getString("http11processor.request.process"), e); // The response should not have been committed but check it // anyway to be safe if (response.isCommitted()) { setErrorState(ErrorState.CLOSE_NOW, e); } else { response.reset(); response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, e); response.setHeader("Connection", "close"); // TODO: Remove } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("http11processor.request.process"), t); // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } } // Finish the handling of the request rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); if (!isAsync()) { // If this is an async request then the request ends when it has // been completed. The AsyncContext is responsible for calling // endRequest() in that case. endRequest(); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); // If there was an error, make sure the request is counted as // and error, and update the statistics counter if (getErrorState().isError()) { response.setStatus(500); } if (!isAsync() || getErrorState().isError()) { request.updateCounters(); if (getErrorState().isIoAllowed()) { inputBuffer.nextRequest(); outputBuffer.nextRequest(); } } if (!protocol.getDisableUploadTimeout()) { int connectionTimeout = protocol.getConnectionTimeout(); if(connectionTimeout > 0) { socketWrapper.setReadTimeout(connectionTimeout); } else { socketWrapper.setReadTimeout(0); } } rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); sendfileState = processSendfile(socketWrapper); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); if (getErrorState().isError() || protocol.isPaused()) { return SocketState.CLOSED; } else if (isAsync()) { return SocketState.LONG; } else if (isUpgrade()) { return SocketState.UPGRADING; } else { if (sendfileState == SendfileState.PENDING) { return SocketState.SENDFILE; } else { if (openSocket) { if (readComplete) { return SocketState.OPEN; } else { return SocketState.LONG; } } else { return SocketState.CLOSED; } } } }
- Http11Processor 是 AbstractProcessor 的實現子類,建構函式會建立以前文章中介紹的 TomcatRequest/TomcatResponse/Http11InputBuffer/Http11OutBuffer 物件,設定它們的關聯關係,以及關聯 CoyoteAdapter 物件。
- Http11Processor 的核心方法是 service 方法,整體程式碼比較長,我們著重分析重點。
- Service 方法會去初始化以前文章中介紹的 Http11InputBuffer 和 Http11OutputBuffer 物件例項,用來解析請求行,請求頭,寫入資料等等。
- 首先會去利用Http11InputBuffer 物件例項的 parseRequestLine() 方法解析請求行,如果請求行沒有解析完(例如client沒有發完資料),那麼返回 SocketState.LONG 的狀態。根據上一篇文章, ConnectionHanlder 如果發現返回 LONG 狀態,會對 socket 包裝物件去註冊 OP_READ 事件,並新增到 poller 執行緒的事件佇列裡,讓 poller 執行緒繼續監聽 client 端可讀事件傳送,從而等待 client 繼續傳送資料。同時並不會移除原始 socket 和處理類 Http11Processor 的關聯關係,也不去回收Http11Processor 例項,以便保持現有狀態(已經解析的資料),當 client 再次傳送資料的時候可以繼續處理。
- 再利用 Http11InputBuffer.parseHeaders() 方法解析請求頭,如果請求頭沒有沒有解析完(client沒有發完資料),則處理方式和上一步解析請求行一樣。
- 該方法中會有一些協議 upgrade 的處理(例如websocket),我們不在這裡詳細展開。
- 當請求頭和請求行完全解析完畢時,會呼叫 CoyoteAdapter.service() 方法,該方法會通過 servlet container 呼叫標準 servlet API 。
- Servlet API 正常呼叫完畢,對於非非同步請求回去呼叫 endRequest() 方法表示結束。在其內部用 Http11InputBuffer.endRequest() 結束請求,用 Http11OutputBuffer.end() 將剩餘 response 資料傳送到 client 端。
- 同時對於非非同步模式下的 servlet 請求,還會去呼叫 Http11InputBuffer.nextRequest() 方法和 Http11OutputBuffer.nextRequest() 方法來回收兩個例項,以便後續重用,可以提高效率。
- 對於非非同步請求正常結束後,返回的 socket 狀態是 SocketState.OPEN 。根據上一篇文章介紹的 ConnectionHandler物件, OPEN 則表示該連線為長連線,不關閉原始 socket 。所以在關聯的 Map 中移除 socket 和 Http11Processor 的對應關係,釋放當前 Http11Processor 例項以便後續重用。由於是長連線,所以和非同步處理方式一樣,對 socket 包裝物件註冊 OP_READ 事件,並新增到 poller 執行緒事件佇列中,讓 poller 執行緒繼續去監聽 client 端可讀事件,從而結束當前請求,為下一個請求做準備。
CoyoteAdapter的核心程式碼邏輯如下:
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception { Request request = (Request) req.getNote(ADAPTER_NOTES); Response response = (Response) res.getNote(ADAPTER_NOTES); if (request == null) { request = connector.createRequest(); request.setCoyoteRequest(req); response = connector.createResponse(); response.setCoyoteResponse(res); request.setResponse(response); response.setRequest(request); req.setNote(ADAPTER_NOTES, request); res.setNote(ADAPTER_NOTES, response); req.getParameters().setQueryStringCharset(connector.getURICharset()); } if (connector.getXpoweredBy()) { response.addHeader("X-Powered-By", POWERED_BY); } boolean async = false; boolean postParseSuccess = false; req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get()); try { // Parse and set Catalina and configuration specific // request parameters postParseSuccess = postParseRequest(req, request, res, response); if (postParseSuccess) { //check valves if we support async request.setAsyncSupported( connector.getService().getContainer().getPipeline().isAsyncSupported()); // Calling the container connector.getService().getContainer().getPipeline().getFirst().invoke( request, response); } if (request.isAsync()) { async = true; ReadListener readListener = req.getReadListener(); if (readListener != null && request.isFinished()) { // Possible the all data may have been read during service() // method so this needs to be checked here ClassLoader oldCL = null; try { oldCL = request.getContext().bind(false, null); if (req.sendAllDataReadEvent()) { req.getReadListener().onAllDataRead(); } } finally { request.getContext().unbind(false, oldCL); } } Throwable throwable = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION); // If an async request was started, is not going to end once // this container thread finishes and an error occurred, trigger // the async error process if (!request.isAsyncCompleting() && throwable != null) { request.getAsyncContextInternal().setErrorState(throwable, true); } } else { request.finishRequest(); response.finishResponse(); } } catch (IOException e) { // Ignore } finally { AtomicBoolean error = new AtomicBoolean(false); res.action(ActionCode.IS_ERROR, error); if (request.isAsyncCompleting() && error.get()) { // Connection will be forcibly closed which will prevent // completion happening at the usual point. Need to trigger // call to onComplete() here. res.action(ActionCode.ASYNC_POST_PROCESS, null); async = false; } // Access log if (!async && postParseSuccess) { // Log only if processing was invoked. // If postParseRequest() failed, it has already logged it. Context context = request.getContext(); Host host = request.getHost(); // If the context is null, it is likely that the endpoint was // shutdown, this connection closed and the request recycled in // a different thread. That thread will have updated the access // log so it is OK not to update the access log here in that // case. // The other possibility is that an error occurred early in // processing and the request could not be mapped to a Context. // Log via the host or engine in that case. long time = System.currentTimeMillis() - req.getStartTime(); if (context != null) { context.logAccess(request, response, time, false); } else if (response.isError()) { if (host != null) { host.logAccess(request, response, time, false); } else { connector.getService().getContainer().logAccess( request, response, time, false); } } } req.getRequestProcessor().setWorkerThreadName(null); // Recycle the wrapper request and response if (!async) { updateWrapperErrorCount(request, response); request.recycle(); response.recycle(); } }}
- CoyoteAdapter 的核心方法是 service 方法,整體程式碼比較長,我們著重分析重點。
- 該方法會去用 tomcat 的 request 和 response 建立 servlet 的標準 request 和 response ,並設定其關聯關係,即把 tomcat request 關聯到 servlet request ,把 tomcat response 關聯到 servlet response 。
- 通過 servlet container 呼叫標準 servlet API,connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)
- 如果不是非同步請求,完成servlet API後,通過HttpServletRequest.finishRequest() 方法呼叫和HttpServletResponse.finishResponse() 方法呼叫結束當前請求和響應。
最後通過 HttpServletRequest.recycle() 呼叫和 HttpServletResponse.recycle() 呼叫來回收請求和響應,以便後面可以重用提高效率。