1. 程式人生 > >學習tomcat-如何建立連線,處理請求

學習tomcat-如何建立連線,處理請求

## tomcat如何建立連線,處理請求 學習探討tomcat如何建立網路連線協議,並處理客戶端過來的請求 ### 建立http網路連線,指定通訊協議 tomcat在建立時,會建立連線物件,負責處理客戶端的請求,基於socket connector 連線 protocol 協議 endpoint終端 socket插座,埠連線 建立初始化 connector -> protocol -> endpoint -> socket 接收請求建立任務 acceptor.socket.acceptor()-> ​ socketWrapper(攜帶通訊資訊) ​ -> poller(socketWrapper) ​ -> execute(socketWrapper) 建立執行緒 #### 建立聯結器 Conector類 `org.apache.catalina.connector.Connector` 空參構造connector() -> connector(http/1.1) ```java /** * Defaults to using HTTP/1.1 NIO implementation. */ public Connector() { this("HTTP/1.1"); } ``` #### 指定網路連線協議http11 類 `org.apache.coyote.http11.Http11NioProtocol` -> new Http11NioProtocol() ```java public Http11NioProtocol() { super(new NioEndpoint()); } ``` #### 指定服務終端處理模型非阻塞nio 類 `org.apache.tomcat.util.net.NioEndpoint` -> new NioEndPoint() 建立之後如何被啟動?見springboot啟動tomcat方式 #### 終端處理執行緒和執行緒池初始化 啟動之後 NioEndpoint執行bind()方法, 一些初始化,繫結埠 ```java @Override public void bind() throws Exception { initServerSocket(); setStopLatch(new CountDownLatch(1)); // Initialize SSL if needed initialiseSsl(); selectorPool.open(getName()); } //socket相關 initServerSocket()具體如下 // Separated out to make it easier for folks that extend NioEndpoint to // implement custom [server]sockets protected void initServerSocket() throws Exception { //....... //根據平臺不同,反回具體底層類物件(windows,linux,unix) serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); //繫結地址和埠號 InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); serverSock.socket().bind(addr,getAcceptCount()); //....... } ``` NioEndpoint初始化之後,呼叫start()執行startInternal() 程式碼如下 ```java // Create worker collection if (getExecutor() == null) { //建立執行緒池 createExecutor(); } initializeConnectionLatch(); // Start poller thread // 建立客戶端佇列(客戶端過來的請求) poller = new Poller(); Thread pollerThread = new Thread(poller, getName() + "-ClientPoller"); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); //建立接收遠端請求執行緒 startAcceptorThread(); ``` ##### 初始化執行緒池配置 -> createExecutor() 用於處理使用者請求 指定 備用執行緒,對大執行緒數,佇列型別,超時時間,和執行緒工廠 ```java public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); } ``` ##### 建立Poller執行緒 ```java poller = new Poller(); Thread pollerThread = new Thread(poller, getName() + "-ClientPoller"); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); ``` ##### 建立Acceptor執行緒 ```java protected void startAcceptorThread() { acceptor = new Acceptor<>(this); String threadName = getName() + "-Acceptor"; acceptor.setThreadName(threadName); Thread t = new Thread(acceptor, threadName); t.setPriority(getAcceptorThreadPriority()); t.setDaemon(getDaemon()); t.start(); } ``` #### 處理請求的相關物件(執行緒) ##### Acceptor 類 `org.apache.tomcat.util.net.Acceptor` Acceptor 負責迴圈等待遠端請求,將請求以socket形式攜帶資訊,呼叫setSocketOptions()將socket包裝配置為socketWrapper, setSocketOptions: 對socket包裝處理配置,使用poller物件註冊到佇列,讓poller執行緒做後續的處理 Acceptor 類的run方法: ```java public void run() { int errorDelay = 0; //......以下省略部分程式碼 try { // Loop until we receive a shutdown command // 一直迴圈等待遠端請求 while (!stopCalled) { // Accept the next incoming connection from the server socket // 1 接收請求 socket = endpoint.serverSocketAccept(); // setSocketOptions() will hand the socket off to // 2 處理請求,setSocketOptions() 內部呼叫poller 將新請求任務放入佇列 if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } } finally { stopLatch.countDown(); } state = AcceptorState.ENDED; } ``` ##### Poller 類 `org.apache.tomcat.util.net.NioEndpoint.Poller` Poller負責接收包裝後的socket請求,放入佇列, 並在run方法中迴圈去poll()請求任務,將與流讀寫有關的元件IOChannel Selector socketWrapper 繫結關聯 再通過selector獲取selectionKeys 迭代迴圈獲取對應的socket,提交任務(執行緒),執行緒讀寫處理socketWrapper等後續操作 ```java public void run() { // Loop until destroy() is called while (true) { // poller佇列任務處理 將IOChannel Selector socketWrapper 關聯 hasEvents = events(); //......省略 Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // 非阻塞io api 任務處理 while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); iterator.remove(); NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (socketWrapper != null) { // 如果有等待處理的任務,則處理 processKey(sk, socketWrapper); //processKey內部會呼叫processSocket方法,最終用執行緒池提交任務 } } // Process timeouts timeout(keyCount,hasEvents); } getStopLatch().countDown(); } ``` ##### 其他 events佇列 ```java private final SynchronizedQueue events = new SynchronizedQueue<>(); //事件佇列(socket請求) //註冊請求到佇列 public void rigister(final NioSocketWrapper socketWrapper) { event = new PollerEvent(socketWrapper, OP_REGISTER); addEvent(event); } private void addEvent(PollerEvent event) { events.offer(event); if (wakeupCounter.incrementAndGet() == 0) { selector.wakeup(); } } ``` events()繫結及後面的 processSocket()最終提交實際處理任務到執行緒 ```java /** * Processes events in the event queue of the Poller. * * @return true
if some events were processed, * false if queue was empty */ public boolean events() { boolean result = false; PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; NioSocketWrapper socketWrapper = pe.getSocketWrapper(); SocketChannel sc = socketWrapper.getSocket().getIOChannel(); int interestOps = pe.getInterestOps(); if (sc == null) { log.warn(sm.getString("endpoint.nio.nullSocketChannel")); socketWrapper.close(); } else if (interestOps == OP_REGISTER) { try { //註冊繫結 sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { final SelectionKey key = sc.keyFor(getSelector()); if (key == null) { // The key was cancelled (e.g. due to socket closure) // and removed from the selector while it was being // processed. Count down the connections at this point // since it won't have been counted down when the socket // closed. socketWrapper.close(); } else { final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment(); if (attachment != null) { // We are registering the key to start with, reset the fairness counter. try { int ops = key.interestOps() | interestOps; attachment.interestOps(ops); key.interestOps(ops); } catch (CancelledKeyException ckx) { cancelledKey(key, socketWrapper); } } else { cancelledKey(key, socketWrapper); } } } if (running && !paused && eventCache != null) { pe.reset();//清空任務socketWrapper eventCache.push(pe); } } return result; } ``` setSocketOptions 中的socket任務註冊 ```java protected boolean setSocketOptions(SocketChannel socket) { NioSocketWrapper socketWrapper = null; try { // Allocate channel and wrapper NioChannel channel = null; if (nioChannels != null) { channel = nioChannels.pop(); } //...... 部分省略 NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this); socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); poller.register(socketWrapper); return true; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error(sm.getString("endpoint.socketOptionsError"), t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } if (socketWrapper == null) { destroySocket(socket); }