1. 程式人生 > 程式設計 >Tomcat中的聯結器是如何設計的

Tomcat中的聯結器是如何設計的

上期回顧

上一篇文章《Tomcat在SpringBoot中是如何啟動的》從main方法啟動說起,窺探了SpringBoot是如何啟動Tomcat的,在分析Tomcat中我們重點提到了,Tomcat主要包括2個元件,聯結器(Connector)和容器(Container)以及他們的內部結構圖,那麼今天我們來分析下Tomcat中的聯結器是怎麼設計的以及它的作用是什麼。

說明:本文tomcat版本是9.0.21,不建議零基礎讀者閱讀。

從聯結器(Connector)原始碼說起

既然是來解析聯結器(Connector),那麼我們直接從原始碼入手,後面所有原始碼我會剔除不重要部分,所以會忽略大部分原始碼細節,只關注流程。原始碼如下(高能預警,大量程式碼):

public class Connector extends LifecycleMBeanBase  {
    public Connector() {
        this("org.apache.coyote.http11.Http11NioProtocol");
    }


    public Connector(String protocol) {
        boolean aprConnector = AprLifecycleListener.isAprAvailable() &&
                AprLifecycleListener.getUseAprConnector();

        if
("HTTP/1.1".equals(protocol) || protocol == null) { if (aprConnector) { protocolHandlerClassName = "org.apache.coyote.http11.Http11AprProtocol"; } else { protocolHandlerClassName = "org.apache.coyote.http11.Http11NioProtocol"; } } else
if ("AJP/1.3".equals(protocol)) { if (aprConnector) { protocolHandlerClassName = "org.apache.coyote.ajp.AjpAprProtocol"; } else { protocolHandlerClassName = "org.apache.coyote.ajp.AjpNioProtocol"; } } else { protocolHandlerClassName = protocol; } // Instantiate protocol handler ProtocolHandler p = null; try { Class<?> clazz = Class.forName(protocolHandlerClassName); p = (ProtocolHandler) clazz.getConstructor().newInstance(); } catch (Exception e) { log.error(sm.getString( "coyoteConnector.protocolHandlerInstantiationFailed"),e); } finally { this.protocolHandler = p; } // Default for Connector depends on this system property setThrowOnFailure(Boolean.getBoolean("org.apache.catalina.startup.EXIT_ON_INIT_FAILURE")); } 複製程式碼

我們來看看Connector的構造方法,其實只做了一件事情,就是根據協議設定對應的ProtocolHandler,根據名稱我們知道,這是協議處理類,所以聯結器內部的一個重要子模組就是ProtocolHandler

關於生命週期

我們看到Connector繼承了LifecycleMBeanBase,我們來看看Connector的最終繼承關係:

我們看到最終實現的是Lifecycle介面,我們看看這個介面是何方神聖。我把其介面的註釋拿下來解釋下

/**
 * Common interface for component life cycle methods.  Catalina components
 * may implement this interface (as well as the appropriate interface(s) for
 * the functionality they support) in order to provide a consistent mechanism
 * to start and stop the component.
 *            start()
 *  -----------------------------
 *  |                           |
 *  | init()                    |
 * NEW -»-- INITIALIZING        |
 * | |           |              |     ------------------«-----------------------
 * | |           |auto          |     |                                        |
 * | |          \|/    start() \|/   \|/     auto          auto         stop() |
 * | |      INITIALIZED --»-- STARTING_PREP --»- STARTING --»- STARTED --»---  |
 * | |         |                                                            |  |
 * | |destroy()|                                                            |  |
 * | --»-----«--    ------------------------«--------------------------------  ^
 * |     |          |                                                          |
 * |     |         \|/          auto                 auto              start() |
 * |     |     STOPPING_PREP ----»---- STOPPING ------»----- STOPPED -----»-----
 * |    \|/                               ^                     |  ^
 * |     |               stop()           |                     |  |
 * |     |       --------------------------                     |  |
 * |     |       |                                              |  |
 * |     |       |    destroy()                       destroy() |  |
 * |     |    FAILED ----»------ DESTROYING ---«-----------------  |
 * |     |                        ^     |                          |
 * |     |     destroy()          |     |auto                      |
 * |     --------»-----------------    \|/                         |
 * |                                 DESTROYED                     |
 * |                                                               |
 * |                            stop()                             |
 * ----»-----------------------------»------------------------------
 *
 * Any state can transition to FAILED.
 *
 * Calling start() while a component is in states STARTING_PREP,STARTING or
 * STARTED has no effect.
 *
 * Calling start() while a component is in state NEW will cause init() to be
 * called immediately after the start() method is entered.
 *
 * Calling stop() while a component is in states STOPPING_PREP,STOPPING or
 * STOPPED has no effect.
 *
 * Calling stop() while a component is in state NEW transitions the component
 * to STOPPED. This is typically encountered when a component fails to start and
 * does not start all its sub-components. When the component is stopped,it will
 * try to stop all sub-components - even those it didn't start.
 *
 * Attempting any other transition will throw {@link LifecycleException}.
 *
 * </pre>
 * The {@link LifecycleEvent}s fired during state changes are defined in the
 * methods that trigger the changed. No {@link LifecycleEvent}s are fired if the
 * attempted transition is not valid.
複製程式碼

這段註釋翻譯就是,這個介面是提供給元件宣告週期管理的,並且提供了宣告週期流轉圖。這裡我們只需要知道正常流程即可:

New--->Init()---->Start()---->Stop()--->Destory()

從生命週期探索聯結器

根據上面的生命週期說明,我們可以知道聯結器(Connector)就是按照如此的宣告週期管理的,所以我們找到了線索,所以聯結器肯定會先初始化然後再啟動。我們檢視其initInternal()方法可以知道聯結器初始化做了什麼事情,原始碼如下:

    @Override
    protected void initInternal() throws LifecycleException {

        super.initInternal();

        if (protocolHandler == null) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerInstantiationFailed"));
        }

        // Initialize adapter
        adapter = new CoyoteAdapter(this);
        protocolHandler.setAdapter(adapter);
        if (service != null) {
            protocolHandler.setUtilityExecutor(service.getServer().getUtilityExecutor());
        }

        // Make sure parseBodyMethodsSet has a default
        if (null == parseBodyMethodsSet) {
            setParseBodyMethods(getParseBodyMethods());
        }

        if (protocolHandler.isAprRequired() && !AprLifecycleListener.isInstanceCreated()) {
            throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoAprListener",getProtocolHandlerClassName()));
        }
        if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
            throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoAprLibrary",getProtocolHandlerClassName()));
        }
        if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
                protocolHandler instanceof AbstractHttp11JsseProtocol) {
            AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
                    (AbstractHttp11JsseProtocol<?>) protocolHandler;
            if (jsseProtocolHandler.isSSLEnabled() &&
                    jsseProtocolHandler.getSslImplementationName() == null) {
                // OpenSSL is compatible with the JSSE configuration,so use it if APR is available
                jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
            }
        }

        try {
            protocolHandler.init();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerInitializationFailed"),e);
        }
    }
}
複製程式碼

根據上面原始碼,我們發現主要是處理protocolHandler並初始化它,同時我們注意到了protocolHandler 設定了一個介面卡,我們看看這個介面卡是做啥的,跟蹤原始碼如下:

   /**
     * The adapter,used to call the connector.
     *
     * @param adapter The adapter to associate
     */
    public void setAdapter(Adapter adapter);
複製程式碼

這個註釋已經說的很直白了,這個介面卡就是用來呼叫聯結器的。我們再繼續看看protocolHandler的初始化方法

 /**
     * Endpoint that provides low-level network I/O - must be matched to the
     * ProtocolHandler implementation (ProtocolHandler using NIO,requires NIO
     * Endpoint etc.).
     */
private final AbstractEndpoint<S,?> endpoint;

public void init() throws Exception {
        if (getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.init",getName()));
            logPortOffset();
        }

        if (oname == null) {
            // Component not pre-registered so register it
            oname = createObjectName();
            if (oname != null) {
                Registry.getRegistry(null,null).registerComponent(this,oname,null);
            }
        }

        if (this.domain != null) {
            rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
            Registry.getRegistry(null,null).registerComponent(
                    getHandler().getGlobal(),rgOname,null);
        }

        String endpointName = getName();
        endpoint.setName(endpointName.substring(1,endpointName.length()-1));
        endpoint.setDomain(domain);

        endpoint.init();
    }
複製程式碼

這裡出現了一個新的物件,endpoint,根據註釋我們可以知道endpoint是用來處理網路IO的,而且必須匹配到指定的子類(比如Nio,就是NioEndPoint處理)。endpoint.init()實際上就是做一些網路的配置,然後就是初始化完畢了。根據我們上面的週期管理,我們知道init()後就是start(),所以我們檢視Connectorstart()原始碼:

 protected void startInternal() throws LifecycleException {

        // Validate settings before starting
        if (getPortWithOffset() < 0) {
            throw new LifecycleException(sm.getString(
                    "coyoteConnector.invalidPort",Integer.valueOf(getPortWithOffset())));
        }

        setState(LifecycleState.STARTING);

        try {
            protocolHandler.start();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerStartFailed"),e);
        }
    }
複製程式碼

其實就是主要呼叫protocolHandler.start()方法,繼續跟蹤,為了方便表述,我會把接下來的程式碼統一放在一起說明,程式碼如下:

//1.類:AbstractProtocol implements ProtocolHandler,MBeanRegistration
 public void start() throws Exception {
     // 省略部分程式碼
    endpoint.start();
    }

//2. 類:AbstractEndPoint   
public final void start() throws Exception {
       // 省略部分程式碼
        startInternal();
    }
 /**3.類:NioEndPoint extends AbstractJsseEndpoint<NioChannel,SocketChannel>
     * Start the NIO endpoint,creating acceptor,poller threads.
     */
    @Override
    public void startInternal() throws Exception {
        //省略部分程式碼
       
            // Start poller thread
            poller = new Poller();
            Thread pollerThread = new Thread(poller,getName() + "-ClientPoller");
            pollerThread.setPriority(threadPriority);
            pollerThread.setDaemon(true);
            pollerThread.start();

            startAcceptorThread();
        }
    }
複製程式碼

到這裡,其實整個啟動程式碼就完成了,我們看到最後是在NioEndPoint建立了一個Poller,並且啟動它,這裡需要補充說明下,這裡只是以NioEndPoint為示列,其實Tomcat 主要提供了三種實現,分別是AprEndPoint,NioEndPoint,Nio2EndPoint,這裡表示了tomcat支援的I/O模型:

APR:採用 Apache 可移植執行庫實現,它根據不同作業系統,分別用c重寫了大部分IO和系統執行緒操作模組,據說效能要比其他模式要好(未實測)。

NIO:非阻塞 I/O

NIO.2:非同步 I/O

上述程式碼主要是開啟兩個執行緒,一個是Poller,一個是開啟Acceptor,既然是執行緒,核心的程式碼肯定是run方法,我們來檢視原始碼,程式碼如下:

//4.類:Acceptor<U> implements Runnable
 public void run() {
 //省略了部分程式碼
                U socket = null;
                    socket = endpoint.serverSocketAccept();
                // Configure the socket
                if (endpoint.isRunning() && !endpoint.isPaused()) {
                    // setSocketOptions() will hand the socket off to
                    // an appropriate processor if successful
                    //核心邏輯
                    if (!endpoint.setSocketOptions(socket)) {
                        endpoint.closeSocket(socket);
                    }
                } else {
                    endpoint.destroySocket(socket);
                }
            
        state = AcceptorState.ENDED;
}
//5.類:NioEndpoint
protected boolean setSocketOptions(SocketChannel socket) {
        // Process the connection
        //省略部分程式碼
        try {
            // Disable blocking,polling will be used
            socket.configureBlocking(false);
            Socket sock = socket.socket();
            socketProperties.setProperties(sock);


            NioSocketWrapper socketWrapper = new NioSocketWrapper(channel,this);
            channel.setSocketWrapper(socketWrapper);
            socketWrapper.setReadTimeout(getConnectionTimeout());
            socketWrapper.setWriteTimeout(getConnectionTimeout());
            socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
            socketWrapper.setSecure(isSSLEnabled());
            //核心邏輯
            poller.register(channel,socketWrapper);
            return true;
  
    }
複製程式碼

這裡可以發現Acceptor主要就是接受socket,然後把它註冊到poller中,我們繼續看看是如何註冊的。

/**6.類NioEndpoint
         * Registers a newly created socket with the poller.
         *
         * @param socket    The newly created socket
         * @param socketWrapper The socket wrapper
         */
        public void register(final NioChannel socket,final NioSocketWrapper socketWrapper) {
            socketWrapper.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
            PollerEvent r = null;
            if (eventCache != null) {
                r = eventCache.pop();
            }
            if (r == null) {
                r = new PollerEvent(socket,OP_REGISTER);
            } else {
                r.reset(socket,OP_REGISTER);
            }
            addEvent(r);
        }
/** 7.類:PollerEvent implements Runnable
 public void run() {
    //省略部分程式碼
    socket.getIOChannel().register(socket.getSocketWrapper().getPoller().getSelector(),SelectionKey.OP_READ,socket.getSocketWrapper());
        }
複製程式碼

這裡發現最終就是採用NIO模型把其註冊到通道中。(這裡涉及NIO網路程式設計知識,不瞭解的同學可以傳送這裡)。那麼註冊完畢後,我們看看Poller做了什麼事情。

*/        
  /**8.類:NioEndPoint內部類 Poller implements Runnable
  **/  
  @Override
        public void run() {
            // Loop until destroy() is called
            while (true) {
                //省略部分程式碼

                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                // Walk through the collection of ready keys and dispatch
                // any active event.
                while (iterator != null && iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
                    // Attachment may be null if another thread has called
                    // cancelledKey()
                    if (socketWrapper == null) {
                        iterator.remove();
                    } else {
                        iterator.remove();
                        //sock處理
                        processKey(sk,socketWrapper);
                    }
                }
        //省略部分程式碼
        }    
複製程式碼

這個就是通過selector把之前註冊的事件取出來,從而完成了呼叫。

//9.類: NioEndPoint內部類 Poller  implements Runnable     
protected void processKey(SelectionKey sk,NioSocketWrapper socketWrapper) {
         //省略大部分程式碼
           processSocket(socketWrapper,SocketEvent.OPEN_WRITE,true)
    
}
       
//10.類:AbstractEndPoint        
public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event,boolean dispatch) {
       //省略部分程式碼
           Executor executor = getExecutor();
           if (dispatch && executor != null) {
               executor.execute(sc);
           } else {
               sc.run();
           }
      
       return true;
   }  
//11.類:SocketProcessorBase  implements Runnable   
public final void run() {
       synchronized (socketWrapper) {
           // It is possible that processing may be triggered for read and
           // write at the same time. The sync above makes sure that processing
           // does not occur in parallel. The test below ensures that if the
           // first event to be processed results in the socket being closed,// the subsequent events are not processed.
           if (socketWrapper.isClosed()) {
               return;
           }
           doRun();
       }
   }
   
//類:12.NioEndPoint   extends AbstractJsseEndpoint<NioChannel,SocketChannel> 
protected void doRun() {
       //省略部分程式碼
               if (handshake == 0) {
                   SocketState state = SocketState.OPEN;
                   // Process the request from this socket
                   if (event == null) {
                       state = getHandler().process(socketWrapper,SocketEvent.OPEN_READ);
                   } else {
                       state = getHandler().process(socketWrapper,event);
                   }
                   if (state == SocketState.CLOSED) {
                       poller.cancelledKey(key,socketWrapper);
                   }
               }

       } 
       
複製程式碼

Poller呼叫的run方法或者用Executor執行緒池去執行run(),最終呼叫都是各個子EndPoint中的doRun()方法,最終會取一個Handler去處理socketWrapper。繼續看原始碼:

//類:13.AbstractProtocol內部類ConnectionHandler implements AbstractEndpoint.Handler<S>
 public SocketState process(SocketWrapperBase<S> wrapper,SocketEvent status) {
            //省略部分程式碼
    
            state = processor.process(wrapper,status);
      
            return SocketState.CLOSED;
        }
        
//類:14.AbstractProcessorLight implements Processor 
public SocketState process(SocketWrapperBase<?> socketWrapper,SocketEvent status)
            throws IOException {
            //省略部分程式碼
           
            state = service(socketWrapper);
            
        return state;
    }
複製程式碼

這部分原始碼錶明最終呼叫的process是通過一個Processor介面的實現類來完成的,這裡最終也是會呼叫到各個子類中,那麼這裡的處理器其實就是處理應用協議,我們可以檢視AbstractProcessorLight的實現類,分別有AjpProcessorHttp11ProcessorStreamProcessor,分別代表tomcat支援三種應用層協議,分別是:

這裡我們以常用的HTTP1.1為例,繼續看原始碼:

//類:15. Http11Processor extends AbstractProcessor
public SocketState service(SocketWrapperBase<?> socketWrapper)
        throws IOException {
        //省略大部分程式碼
             getAdapter().service(request,response);
        //省略大部分程式碼   
        } 
//類:16   CoyoteAdapter implements Adapter
public void service(org.apache.coyote.Request req,org.apache.coyote.Response res)
            throws Exception {

        Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);
        postParseSuccess = postParseRequest(req,request,res,response);
            if (postParseSuccess) {
                //check valves if we support async
                request.setAsyncSupported(
                        connector.getService().getContainer().getPipeline().isAsyncSupported());
                // Calling the container
                connector.getService().getContainer().getPipeline().getFirst().invoke(
                        request,response);
            }
            
    }
複製程式碼

這裡我們發現協議處理器最終會呼叫介面卡(CoyoteAdapter),而介面卡最終的工作是轉換RequestResponse物件為HttpServletRequestHttpServletResponse,從而可以去呼叫容器,到這裡整個聯結器的流程和作用我們就已經分析完了。

小結

那麼我們來回憶下整個流程,我畫了一張時序圖來說明:

這張圖包含了兩個流程,一個是元件的初始化,一個是呼叫的流程。聯結器(Connector)主要初始化了兩個元件,ProtcoHandlerEndPoint,但是我們從程式碼結構發現,他們兩個是父子關係,也就是說ProtcoHandler包含了EndPoint。後面的流程就是各個子元件的呼叫鏈關係,總結來說就是Acceptor負責接收請求,然後註冊到PollerPoller負責處理請求,然後呼叫processor處理器來處理,最後把請求轉成符合Servlet規範的requestresponse去呼叫容器(Container)。

我們流程梳理清楚了,接下來我們來結構化的梳理下:

回到聯結器(Connector)是原始碼,我們發現,上述說的模組只有ProtocolHandlerAdapter兩個屬於聯結器中,也就是說,聯結器只包含了這兩大子模組,那麼後續的EndPointAcceptorPollerProcessor都是ProtocolHandler的子模組。 而AcceptorPoller兩個模組的核心功能都是在EndPoint 中完成的,所以是其子模組,而Processor比較獨立,所以它和EndPoint是一個級別的子模組。

我們用圖來說明下上述的關係:

根據上圖我們可以知道,聯結器主要負責處理連線請求,然後通過介面卡呼叫容器。那麼具體流程細化可以如下:

  • Acceptor監聽網路請求,獲取請求。
  • Poller獲取到監聽的請求提交執行緒池進行處理。
  • Processor根據具體的應用協議(HTTP/AJP)來生成Tomcat Request物件。
  • Adapter把Request物件轉換成Servlet標準的Request物件,呼叫容器。

總結

我們從聯結器的原始碼,一步一步解析,分析了聯結器主要包含了兩大模組,ProtocolHandlerAdapterProtocolHandler主要包含了Endpoint模組和Processor模組。Endpoint模組主要的作用是連線的處理,它委託了Acceptor子模組進行連線的監聽和註冊,委託子模組Poller進行連線的處理;而Processor模組主要是應用協議的處理,最後提交給Adapter進行物件的轉換,以便可以呼叫容器(Container)。另外我們也在分析原始碼的過程中補充了一些額外知識點:

  • 當前Tomcat版本支援的IO模型為:APR模型、NIO模型、NIO.2模型
  • Tomcat支援的協議是AJP和HTTP,其中HTTP又分為HTTP1.1和HTTP2.0

版權宣告:原創文章,轉載請註明出處。