1. 程式人生 > 其它 >類 workbooks 的 open 方法無效_Tomcat NIO(10)IO執行緒關鍵類

類 workbooks 的 open 方法無效_Tomcat NIO(10)IO執行緒關鍵類

技術標籤:類 workbooks 的 open 方法無效

在上一篇文章裡我們主要介紹了tomcat io執行緒的 overall 呼叫流程以及關鍵類SocketProcessor 和 ConnectionHandler 的核心邏輯總結,這裡我們主要來介紹剩餘其它的核心類 AbstractProcessorLight,Http11Processor,CoyoteAdapter。

AbstractProcessorLight核心邏輯如下:

public SocketState process(SocketWrapperBase> socketWrapper, SocketEvent status) throws IOException {    SocketState state = SocketState.CLOSED;    Iterator dispatches = null;    do {        if (dispatches != null) {            DispatchType nextDispatch = dispatches.next();            state = dispatch(nextDispatch.getSocketStatus());        } else if (status == SocketEvent.DISCONNECT) {            // Do nothing here, just wait for it to get recycled        } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {            state = dispatch(status);            if (state == SocketState.OPEN) {                // There may be pipe-lined data to read. If the data isn't                // processed now, execution will exit this loop and call                // release() which will recycle the processor (and input                // buffer) deleting any pipe-lined data. To avoid this,                // process it now.                state = service(socketWrapper);            }        } else if (status == SocketEvent.OPEN_WRITE) {            // Extra write event likely after async, ignore            state = SocketState.LONG;        } else if (status == SocketEvent.OPEN_READ){            state = service(socketWrapper);        } else {            // Default to closing the socket if the SocketEvent passed in            // is not consistent with the current state of the Processor            state = SocketState.CLOSED;        }        if (getLog().isDebugEnabled()) {            getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]");        }        if (state != SocketState.CLOSED && isAsync()) {            state = asyncPostProcess();            if (getLog().isDebugEnabled()) {                getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]");            }        }        if (dispatches == null || !dispatches.hasNext()) {            dispatches = getIteratorAndClearDispatches();        }    } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED);    return state;}
  • 該類的核心方法為 process() ,會被在上一篇文章之中介紹的 ConnectionHandler 物件例項的 process() 方法調所用。
  • 該方法根據不同的 socket 事件和是否採用非同步處理來進行不同的呼叫,返回期望的 SocketState 狀態,這裡我們只對非非同步的正常呼叫介紹。
  • 對於非非同步的正常呼叫下,SocketEvent 為 OPEN_READ ,進入Http11Processor 例項的 service() 方法。
  • 對於其他未知的 SocketEvent 事件來說,返回給 ConnectionHandler 例項的SocketState 為 CLOSED 。根據以前文章,這樣的結果會被給 SocketProcessor 關閉原始 socket 。

Http11Processor的核心程式碼邏輯如下:

public AbstractProcessor(Adapter adapter) {this(adapter,newRequest(),newResponse());}//Http11Processorpublic Http11Processor(AbstractHttp11Protocol> protocol, Adapter adapter) {    super(adapter);    this.protocol = protocol;    httpParser = new HttpParser(protocol.getRelaxedPathChars(),            protocol.getRelaxedQueryChars());    inputBuffer = new Http11InputBuffer(request, protocol.getMaxHttpHeaderSize(),            protocol.getRejectIllegalHeaderName(), httpParser);    request.setInputBuffer(inputBuffer);    outputBuffer = new Http11OutputBuffer(response, protocol.getMaxHttpHeaderSize());    response.setOutputBuffer(outputBuffer);    // Create and add the identity filters.    inputBuffer.addFilter(new IdentityInputFilter(protocol.getMaxSwallowSize()));    outputBuffer.addFilter(new IdentityOutputFilter());    // Create and add the chunked filters.    inputBuffer.addFilter(new ChunkedInputFilter(protocol.getMaxTrailerSize(),            protocol.getAllowedTrailerHeadersInternal(), protocol.getMaxExtensionSize(),            protocol.getMaxSwallowSize()));    outputBuffer.addFilter(new ChunkedOutputFilter());    // Create and add the void filters.    inputBuffer.addFilter(new VoidInputFilter());    outputBuffer.addFilter(new VoidOutputFilter());    // Create and add buffered input filter    inputBuffer.addFilter(new BufferedInputFilter());    // Create and add the chunked filters.    //inputBuffer.addFilter(new GzipInputFilter());    outputBuffer.addFilter(new GzipOutputFilter());    pluggableFilterIndex = inputBuffer.getFilters().length;}public SocketState service(SocketWrapperBase> socketWrapper) throws IOException {        RequestInfo rp = request.getRequestProcessor();        rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);        setSocketWrapper(socketWrapper);        inputBuffer.init(socketWrapper);        outputBuffer.init(socketWrapper);        keepAlive = true;        openSocket = false;        readComplete = true;        boolean keptAlive = false;        SendfileState sendfileState = SendfileState.DONE;        while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !protocol.isPaused()) {            // Parsing the request header            try {if(!inputBuffer.parseRequestLine(keptAlive,protocol.getConnectionTimeout(), protocol.getKeepAliveTimeout())){                    if (inputBuffer.getParsingRequestLinePhase() == -1) {                        return SocketState.UPGRADING;                    } else if (handleIncompleteRequestLineRead()) {                        break;                    }                }                if (protocol.isPaused()) {                    // 503 - Service unavailable                    response.setStatus(503);                    setErrorState(ErrorState.CLOSE_CLEAN, null);                } else {                    keptAlive = true;                    // Set this every time in case limit has been changed via JMX                    request.getMimeHeaders().setLimit(protocol.getMaxHeaderCount());                    if (!inputBuffer.parseHeaders()) {                        // We've read part of the request, don't recycle it                        // instead associate it with the socket                        openSocket = true;                        readComplete = false;                        break;                    }                    if (!protocol.getDisableUploadTimeout()) {                        socketWrapper.setReadTimeout(protocol.getConnectionUploadTimeout());                    }                }            } catch (IOException e) {                if (log.isDebugEnabled()) {                    log.debug(sm.getString("http11processor.header.parse"), e);                }                setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);                break;            } catch (Throwable t) {                ExceptionUtils.handleThrowable(t);                UserDataHelper.Mode logMode = userDataHelper.getNextMode();                if (logMode != null) {                    String message = sm.getString("http11processor.header.parse");                    switch (logMode) {                        case INFO_THEN_DEBUG:                            message += sm.getString("http11processor.fallToDebug");                        case INFO:                            log.info(message, t);                            break;                        case DEBUG:                            log.debug(message, t);                    }                }                // 400 - Bad Request                response.setStatus(400);                setErrorState(ErrorState.CLOSE_CLEAN, t);            }            // Has an upgrade been requested?            Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection");            boolean foundUpgrade = false;            while (connectionValues.hasMoreElements() && !foundUpgrade) {                foundUpgrade = connectionValues.nextElement().toLowerCase(                        Locale.ENGLISH).contains("upgrade");            }            if (foundUpgrade) {                // Check the protocol                String requestedProtocol = request.getHeader("Upgrade");                UpgradeProtocol upgradeProtocol = protocol.getUpgradeProtocol(requestedProtocol);                if (upgradeProtocol != null) {                    if (upgradeProtocol.accept(request)) {                        response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS);                        response.setHeader("Connection", "Upgrade");                        response.setHeader("Upgrade", requestedProtocol);                        action(ActionCode.CLOSE,  null);                        getAdapter().log(request, response, 0);                        InternalHttpUpgradeHandler upgradeHandler =                                upgradeProtocol.getInternalUpgradeHandler(                                        socketWrapper, getAdapter(), cloneRequest(request));                        UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null);                        action(ActionCode.UPGRADE, upgradeToken);                        return SocketState.UPGRADING;                    }                }            }            if (getErrorState().isIoAllowed()) {                // Setting up filters, and parse some request headers                rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);                try {                    prepareRequest();                } catch (Throwable t) {                    ExceptionUtils.handleThrowable(t);                    if (log.isDebugEnabled()) {                        log.debug(sm.getString("http11processor.request.prepare"), t);                    }                    // 500 - Internal Server Error                    response.setStatus(500);                    setErrorState(ErrorState.CLOSE_CLEAN, t);                }            }            int maxKeepAliveRequests = protocol.getMaxKeepAliveRequests();            if (maxKeepAliveRequests == 1) {                keepAlive = false;            } else if (maxKeepAliveRequests > 0 &&                    socketWrapper.decrementKeepAlive() <= 0) {                keepAlive = false;            }            // Process the request in the adapter            if (getErrorState().isIoAllowed()) {                try {                    rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);                    getAdapter().service(request, response);                    // Handle when the response was committed before a serious                    // error occurred.  Throwing a ServletException should both                    // set the status to 500 and set the errorException.                    // If we fail here, then the response is likely already                    // committed, so we can't try and set headers.                    if(keepAlive && !getErrorState().isError() && !isAsync() && statusDropsConnection(response.getStatus())) {                        setErrorState(ErrorState.CLOSE_CLEAN, null);                    }                } catch (InterruptedIOException e) {                    setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);                } catch (HeadersTooLargeException e) {                    log.error(sm.getString("http11processor.request.process"), e);                    // The response should not have been committed but check it                    // anyway to be safe                    if (response.isCommitted()) {                        setErrorState(ErrorState.CLOSE_NOW, e);                    } else {                        response.reset();                        response.setStatus(500);                        setErrorState(ErrorState.CLOSE_CLEAN, e);                        response.setHeader("Connection", "close"); // TODO: Remove                    }                } catch (Throwable t) {                    ExceptionUtils.handleThrowable(t);                    log.error(sm.getString("http11processor.request.process"), t);                    // 500 - Internal Server Error                    response.setStatus(500);                    setErrorState(ErrorState.CLOSE_CLEAN, t);                    getAdapter().log(request, response, 0);                }            }            // Finish the handling of the request            rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);            if (!isAsync()) {                // If this is an async request then the request ends when it has                // been completed. The AsyncContext is responsible for calling                // endRequest() in that case.                endRequest();            }            rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);            // If there was an error, make sure the request is counted as            // and error, and update the statistics counter            if (getErrorState().isError()) {                response.setStatus(500);            }            if (!isAsync() || getErrorState().isError()) {                request.updateCounters();                if (getErrorState().isIoAllowed()) {                    inputBuffer.nextRequest();                    outputBuffer.nextRequest();                }            }            if (!protocol.getDisableUploadTimeout()) {                int connectionTimeout = protocol.getConnectionTimeout();                if(connectionTimeout > 0) {                    socketWrapper.setReadTimeout(connectionTimeout);                } else {                    socketWrapper.setReadTimeout(0);                }            }            rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);            sendfileState = processSendfile(socketWrapper);        }        rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);        if (getErrorState().isError() || protocol.isPaused()) {            return SocketState.CLOSED;        } else if (isAsync()) {            return SocketState.LONG;        } else if (isUpgrade()) {            return SocketState.UPGRADING;        } else {            if (sendfileState == SendfileState.PENDING) {                return SocketState.SENDFILE;            } else {                if (openSocket) {                    if (readComplete) {                        return SocketState.OPEN;                    } else {                        return SocketState.LONG;                    }                } else {                    return SocketState.CLOSED;                }            }        }    }
  • Http11Processor 是 AbstractProcessor 的實現子類,建構函式會建立以前文章中介紹的 TomcatRequest/TomcatResponse/Http11InputBuffer/Http11OutBuffer 物件,設定它們的關聯關係,以及關聯 CoyoteAdapter 物件。
  • Http11Processor 的核心方法是 service 方法,整體程式碼比較長,我們著重分析重點。
  • Service 方法會去初始化以前文章中介紹的 Http11InputBuffer 和 Http11OutputBuffer 物件例項,用來解析請求行,請求頭,寫入資料等等。
  • 首先會去利用Http11InputBuffer 物件例項的 parseRequestLine() 方法解析請求行,如果請求行沒有解析完(例如client沒有發完資料),那麼返回 SocketState.LONG 的狀態。根據上一篇文章, ConnectionHanlder 如果發現返回 LONG 狀態,會對 socket 包裝物件去註冊 OP_READ 事件,並新增到 poller 執行緒的事件佇列裡,讓 poller 執行緒繼續監聽 client 端可讀事件傳送,從而等待 client 繼續傳送資料。同時並不會移除原始 socket 和處理類 Http11Processor 的關聯關係,也不去回收Http11Processor 例項,以便保持現有狀態(已經解析的資料),當 client 再次傳送資料的時候可以繼續處理。
  • 再利用 Http11InputBuffer.parseHeaders() 方法解析請求頭,如果請求頭沒有沒有解析完(client沒有發完資料),則處理方式和上一步解析請求行一樣。
  • 該方法中會有一些協議 upgrade 的處理(例如websocket),我們不在這裡詳細展開。
  • 當請求頭和請求行完全解析完畢時,會呼叫 CoyoteAdapter.service() 方法,該方法會通過 servlet container 呼叫標準 servlet API 。
  • Servlet API 正常呼叫完畢,對於非非同步請求回去呼叫 endRequest() 方法表示結束。在其內部用 Http11InputBuffer.endRequest() 結束請求,用 Http11OutputBuffer.end() 將剩餘 response 資料傳送到 client 端。
  • 同時對於非非同步模式下的 servlet 請求,還會去呼叫 Http11InputBuffer.nextRequest() 方法和 Http11OutputBuffer.nextRequest() 方法來回收兩個例項,以便後續重用,可以提高效率。
  • 對於非非同步請求正常結束後,返回的 socket 狀態是 SocketState.OPEN 。根據上一篇文章介紹的 ConnectionHandler物件, OPEN 則表示該連線為長連線,不關閉原始 socket 。所以在關聯的 Map 中移除 socket 和 Http11Processor 的對應關係,釋放當前 Http11Processor 例項以便後續重用。由於是長連線,所以和非同步處理方式一樣,對 socket 包裝物件註冊 OP_READ 事件,並新增到 poller 執行緒事件佇列中,讓 poller 執行緒繼續去監聽 client 端可讀事件,從而結束當前請求,為下一個請求做準備。

CoyoteAdapter的核心程式碼邏輯如下:

public void service(org.apache.coyote.Request req, org.apache.coyote.Response res) throws Exception {    Request request = (Request) req.getNote(ADAPTER_NOTES);    Response response = (Response) res.getNote(ADAPTER_NOTES);    if (request == null) {        request = connector.createRequest();        request.setCoyoteRequest(req);        response = connector.createResponse();        response.setCoyoteResponse(res);        request.setResponse(response);        response.setRequest(request);        req.setNote(ADAPTER_NOTES, request);        res.setNote(ADAPTER_NOTES, response);        req.getParameters().setQueryStringCharset(connector.getURICharset());    }    if (connector.getXpoweredBy()) {        response.addHeader("X-Powered-By", POWERED_BY);    }    boolean async = false;    boolean postParseSuccess = false;    req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());    try {        // Parse and set Catalina and configuration specific        // request parameters        postParseSuccess = postParseRequest(req, request, res, response);        if (postParseSuccess) {            //check valves if we support async            request.setAsyncSupported(                    connector.getService().getContainer().getPipeline().isAsyncSupported());            // Calling the container            connector.getService().getContainer().getPipeline().getFirst().invoke(                    request, response);        }        if (request.isAsync()) {            async = true;            ReadListener readListener = req.getReadListener();            if (readListener != null && request.isFinished()) {                // Possible the all data may have been read during service()                // method so this needs to be checked here                ClassLoader oldCL = null;                try {                    oldCL = request.getContext().bind(false, null);                    if (req.sendAllDataReadEvent()) {                        req.getReadListener().onAllDataRead();                    }                } finally {                    request.getContext().unbind(false, oldCL);                }            }            Throwable throwable =                    (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);            // If an async request was started, is not going to end once            // this container thread finishes and an error occurred, trigger            // the async error process            if (!request.isAsyncCompleting() && throwable != null) {                request.getAsyncContextInternal().setErrorState(throwable, true);            }        } else {            request.finishRequest();            response.finishResponse();        }    } catch (IOException e) {        // Ignore    } finally {        AtomicBoolean error = new AtomicBoolean(false);        res.action(ActionCode.IS_ERROR, error);        if (request.isAsyncCompleting() && error.get()) {            // Connection will be forcibly closed which will prevent            // completion happening at the usual point. Need to trigger            // call to onComplete() here.            res.action(ActionCode.ASYNC_POST_PROCESS,  null);            async = false;        }        // Access log        if (!async && postParseSuccess) {            // Log only if processing was invoked.            // If postParseRequest() failed, it has already logged it.            Context context = request.getContext();            Host host = request.getHost();            // If the context is null, it is likely that the endpoint was            // shutdown, this connection closed and the request recycled in            // a different thread. That thread will have updated the access            // log so it is OK not to update the access log here in that            // case.            // The other possibility is that an error occurred early in            // processing and the request could not be mapped to a Context.            // Log via the host or engine in that case.            long time = System.currentTimeMillis() - req.getStartTime();            if (context != null) {                context.logAccess(request, response, time, false);            } else if (response.isError()) {                if (host != null) {                    host.logAccess(request, response, time, false);                } else {                    connector.getService().getContainer().logAccess(                            request, response, time, false);                }            }        }        req.getRequestProcessor().setWorkerThreadName(null);        // Recycle the wrapper request and response        if (!async) {            updateWrapperErrorCount(request, response);            request.recycle();            response.recycle();        }    }}
  • CoyoteAdapter 的核心方法是 service 方法,整體程式碼比較長,我們著重分析重點。
  • 該方法會去用 tomcat 的 request 和 response 建立 servlet 的標準 request 和 response ,並設定其關聯關係,即把 tomcat request 關聯到 servlet request ,把 tomcat response 關聯到 servlet response 。
  • 通過 servlet container 呼叫標準 servlet API,connector.getService().getContainer().getPipeline().getFirst().invoke(request, response)
  • 如果不是非同步請求,完成servlet API後,通過HttpServletRequest.finishRequest() 方法呼叫和HttpServletResponse.finishResponse() 方法呼叫結束當前請求和響應。
  • 最後通過 HttpServletRequest.recycle() 呼叫和 HttpServletResponse.recycle() 呼叫來回收請求和響應,以便後面可以重用提高效率。

目前先寫到這裡,下一篇文章裡我們繼續介紹 tomcat io 執行緒中的讀寫。

679c9d9c1c50093e59efe3e0ef78f9e5.png