1. 程式人生 > 程式設計 >NIO2(AIO) 在Tomcat的應用

NIO2(AIO) 在Tomcat的應用

概述

注: NIO2(AIO) 即非同步IO

NIO2 簡介

AIONIO2其實是一回事,如同孫行者、者行孫其實都是孫猴子,只是名稱不同本質都一樣

那麼如何理解這個概念呢?舉個例子

假設一個妹子有許多的舔狗(SpareTire),如果妹子想要完成某件事最簡單、高效的方法是什麼?

答案是,舔狗那麼多,交給他們去辦就ok了。那麼狗子辦事期間,妹子會一直等待狗子把事情做好嗎?不行,這期間當然可以繼續將其他任務派發給其他狗子。當狗子辦事期間,如果有需要妹子處理的事情,通知處理一下即可。

當然狗子一般都是處理一些重活累活,比如資料拷貝、I/O啊,接收新連線啥的(太慘了)。妹子則專注於核心業務的處理。

在這個例子中,妹子相當於核心業務執行緒,主要用來處理業務邏輯,而狗子們則是(核心+I/O執行緒)的抽象。

P.S.

  • 如果你瞭解NIO2,建議你直接閱讀NIO2模型解讀章節,不需要再閱讀NIO2 DEMO章節(時間寶貴)

  • 你可以直接越過所有章節去看總結,也可以簡單閱讀附錄直接上手除錯程式碼

NIO2 DEMO

NIO2中有個核心點,就是核心負責主要負責通知程式有什麼事件,而連線的接收以及資料的拷貝還是需要程式提供執行緒來做這些事情,你可以理解為妹子(核心業務執行緒)需要提供舔狗池(執行緒池)給核心來做這些事情

talk is cheap,show me your hair

如果你想要學習一下NIO2,可以點選

學習

該原始碼的註釋為GBK編碼,如果你看到註釋為亂碼,最好將其改為GBK編碼

這是一個Demo,值得注意的是雖然該例子中並沒有顯式的建立執行緒池,這是因為如果你在open()服務端的時候,如果沒有顯示指定,系統將會預設分配給ServerSocketChannel一個執行緒池,用於事件的處理,我們可以開啟JConsole驗證一下.

channel = AsynchronousServerSocketChannel.open();
複製程式碼

JConsole中顯示的執行緒

如圖所示thread-0到thread-4就是系統預設分配的執行緒池,用來處理I/O事件。(天賜舔狗)

想象一下,如果我們在處理I/O事件的時候將所有執行緒都阻塞住了,那麼整個系統的I/O都將陷入阻塞,如下圖所示。

在有新的I/O事件到來的時候,核心會選擇一個執行緒來處理這些I/O事件,如果處理I/O的執行緒陷入阻塞,那麼來自客戶端的請求將會一直被阻塞住,無法返回。

因此處理I/O事件的執行緒最好只處理I/O事件(接收新連線、將資料從核心拷貝到執行緒中)

你可以理解為,舔狗最好只做舔狗該做的事情,即重活累活,至於核心業務或者會發生阻塞的情況的事件最好提交給妹子(業務邏輯處理執行緒池)來處理。

Tomcat NIO2 模型

關鍵類 org.apache.tomcat.util.net.Nio2Endpoint

既然是講解NIO2的處理模型,那麼我們有必要了解以下關鍵角色

  • Nio2Acceptor Acceptor並不與特定的執行緒繫結,而是當由新連線到來從執行緒池中選擇一個執行緒來執行Acceptor的程式碼,這一個過程是由底層幫我們完成的,Acceptor的主要任務是接收新連線,併為該連線註冊讀寫的處理物件

  • LimitLatch 限制連線數,在非同步I/O情況限制連線數的主要方式就是鎖阻塞用於I/O事件的執行緒池中的執行緒

  • I/O處理器 處理I/O的類,與Nio2Acceptor執行在同一個執行緒池中

ServerSocket的啟動

非同步ServerSocket啟動的流程較為枯燥,如果你不想看程式碼,以下為其啟動的流程

  • 建立執行緒池,將其包裝為AsynchronousChannelGroup
  • 開啟ServerSocket
  • 繫結埠,並設定最大連線數
    @Override
    public void bind() throws Exception {

        // 建立執行緒池
        if (getExecutor() == null) {
            createExecutor();
        }
        if (getExecutor() instanceof ExecutorService) {
            //建立用於I/O的執行緒池(需要用AsynchronousChannelGroup包裝,才能提供給AsynchronousServerSocketChanne用)
            threadGroup = AsynchronousChannelGroup.withThreadPool((ExecutorService) getExecutor());
        }
        // AsynchronousChannelGroup needs exclusive access to its executor service
        if (!internalExecutor) {
            log.warn(sm.getString("endpoint.nio2.exclusiveExecutor"));
        }
        //建立ServerSocketChannel
        serverSock = AsynchronousServerSocketChannel.open(threadGroup);
        socketProperties.setProperties(serverSock);
        InetSocketAddress addr = new InetSocketAddress(getAddress(),getPortWithOffset());
        //繫結埠並設定backlog的引數
        //backlog可以理解為當前最大待執行accept操作的連線數
        serverSock.bind(addr,getAcceptCount());

        // Initialize SSL if needed
        initialiseSsl();
    }
複製程式碼

如下圖,就是和當前非同步ServerSocketChannel繫結的執行緒池,801表示該聯結器所監聽的埠(附錄中有開啟NIO2的教程)

nio2的執行緒池

Nio2Acceptor

Nio2Acceptor主要功能接收新連線,並限制最大連線數,因為採用的是非同步I/O,所以Acceptor並不會於特定的執行緒繫結,而是當新任務需要執行的時候,從執行緒池中選一個執行任務。如下圖所示當有客戶端新連線到達時,程式會從執行緒池選擇一個執行緒來執行Nio2Acceptor的completed方法並傳入客戶端Socket開始執行新連線處理的業務邏輯

斷點

AcceptHandler的註冊

在非同步I/O中我們需要向ServerSocketChannel註冊處理Accept事件的處理器以便完成連線事件的處理 如以下程式碼所示,當tomcat啟動的時候,會開啟一個執行緒呼叫Nio2SocketAcceptor的run方法,將Nio2SocketAcceptor註冊為ServerSocketChannel的accept事件處理器

 protected class Nio2Acceptor extends Acceptor<AsynchronousSocketChannel>
        implements CompletionHandler<AsynchronousSocketChannel,Void> {
    ...
        @Override
        public void run() {
            // The initial accept will be called in a separate utility thread
            if (!isPaused()) {
                // 連線數限制,如果達到最大連線數,則呼叫此方法的執行緒會陷入等待
                try {
                    countUpOrAwaitConnection();
                } catch (InterruptedException e) {
                    // Ignore
                }
                if (!isPaused()) {
                   //將自己註冊為accept事件的處理器(注意此類實現的介面)
                    serverSock.accept(null,this);
                } else {
                    state = AcceptorState.PAUSED;
                }
            } else {
                state = AcceptorState.PAUSED;
            }
        }
    ...
}
複製程式碼

新連線的處理

當有新連線到達時,底層會從執行緒池選擇一個執行緒來執行completed方法並傳入客戶端socket,此時該方法主要的流程如下

  • 檢查是容器否仍在執行如果仍在執行則繼續流程
  • 檢查是否需要限制連線數,如果需要限制連線數,則從執行緒池中選擇一個執行緒來執行Acceptor的run方法(此方法可能會發生阻塞)
  • 以上操作均已完成則呼叫setSocketOptions方法執行後續I/O事件處理,至此新連線的接收完成
        @Override
        public void completed(AsynchronousSocketChannel socket,Void attachment) {
            // Successful accept,reset the error delay
            errorDelay = 0;
            // Continue processing the socket on the current thread
            // Configure the socket
            if (isRunning() && !isPaused()) {
                //檢查限制的最大連線數,如果沒有設定(即-1)則不進行連線數限制
                if (getMaxConnections() == -1) {
                    serverSock.accept(null,this);
                } else {
                   //由於有新連線的到達,因此需要從執行緒池選一個執行緒執行增加連線數的操作,此操作可能會發生阻塞
                    getExecutor().execute(this);
                }
                //執行後續的I/O事件處理
                if (!setSocketOptions(socket)) {
                    closeSocket(socket);
                }
            } else {
                if (isRunning()) {
                    state = AcceptorState.PAUSED;
                }
                destroySocket(socket);
            }
        }
複製程式碼

限制最大連線數的實現

由於Acceptor並不與特定的執行緒繫結,因此如果需要限制最大連線數,需要使用鎖將空閒的執行緒阻塞住,這也時為什麼需要在accept新連線的時候需要向執行緒池提交增加新連線數的任務,如下所示(也就是呼叫Nio2SocketAcceptor的run方法)

 public void completed(AsynchronousSocketChannel socket,Void attachment) {
        ...
        getExecutor().execute(this);
        ...
}
複製程式碼

除此之外,還記得在建立ServerSocketChannel的時候我們設定了backlog引數嗎?

該引數主要用於設定當前ServerSocket所允許的最大未accept的連線數,也就是說如果超過了未accept得連線數backlog所設定的值,那麼新來的連線都將會被丟棄掉。(API檔案)

I/O 事件的處理

既然是非同步I/O,那麼必然要在客戶端Socket註冊讀寫的CompletionHandler,因此setSocketOptions必然會導致這一步驟的發生,那麼此步驟發生在什麼時候呢?

經過Debug跟蹤發現setSocketOptions將會導致Nio2SocketWrapper的建立,而實際I/O流程就發生在新建Nio2SocketWrapper物件時所建立的readCompletionHandler中,以下是其程式碼

ReadCompletionHandler 用於監聽讀事件,在讀取到資料之後會呼叫processSocket方法開始資料的解析工作

        public Nio2SocketWrapper(Nio2Channel channel,final Nio2Endpoint endpoint) {
            super(channel,endpoint);
            nioChannels = endpoint.getNioChannels();
            socketBufferHandler = channel.getBufHandler();

            this.readCompletionHandler = new CompletionHandler<Integer,ByteBuffer>() {
                @Override
                public void completed(Integer nBytes,ByteBuffer attachment) {
                    if (log.isDebugEnabled()) {
                        log.debug("Socket: [" + Nio2SocketWrapper.this + "],Interest: [" + readInterest + "]");
                    }
                    readNotify = false;
                    //加鎖,其他執行緒可能會對標誌位進行修改
                    synchronized (readCompletionHandler) {
                        //nBytes表示讀取到的位元組數,如果小於0
                        //丟擲EOF異常,沒資料讀,那咋辦嗎,只好拋異常了
                        if (nBytes.intValue() < 0) {
                            failed(new EOFException(),attachment);
                        } else {
                            if (readInterest && !Nio2Endpoint.isInline()) {
                                readNotify = true;
                            } else {
                                // Release here since there will be no
                                // notify/dispatch to do the release.
                                readPending.release();
                            }
                            readInterest = false;
                        }
                    }
                    if (readNotify) {
                        //處理讀事件
                        getEndpoint().processSocket(Nio2SocketWrapper.this,SocketEvent.OPEN_READ,false);
                    }
                }
                //省略程式碼,後面太長了
                ...
複製程式碼

debug驗證一下,如下圖所示,attachment即我們所讀到的資料

斷點

注意 在debug的時候IDEA可能發出切換執行緒的請求(讀資料和之前的操作是不在一個執行緒上的,如下所示

switch thread

總結

Tomcat NIO2 模型

模型
總結如下

  • accept事件和 I/O事件共用一個執行緒池,不會和特定執行緒繫結

  • Acceptor(Nio2Acceptor) 用於接收新連線,並註冊I/O事件的處理物件

  • LimitLatch通過阻塞住執行緒池中的執行緒來實現連線數限制功能

  • I/O Handler 即在Nio2SocketWrapper註冊的讀寫處理器,有I/O事件到達時,程式會選擇一個執行緒來執行這些處理器的程式碼

  • 總體流程如下 新連線到達->選擇一個執行緒執行Nio2Acceptor的程式碼->向執行緒池中提交增加連線數的任務->註冊讀寫處理事件->I/O事件到達,選擇一個執行緒處理I/O事件

思想遷移

不要使用預設執行緒池 在非同步ServerSocketChannel建立的時候,tomcat會自己建立一個執行緒池,而不是使用預設提供的執行緒池,由於執行緒池在我們掌握之中,由此才實現了連線數限制的功能

不要阻塞I/O執行緒 I/O執行緒就要有I/O執行緒的亞子,不要在I/O執行緒執行會發生長時間阻塞的操作

附錄 如何除錯tomcat

後端程式猿都曉得,SpringBoot中內嵌了tomcat(當然還有jetty,取決於你如何選擇),因此我們可以新建一個SpringBoot應用來專門除錯學習Tomcat的原始碼。

以下為tomcat除錯的過程

  • 第一步,開啟IDEA
  • 第二步,新建SpringBoot工程
  • 第三步,在專案側邊欄Ctrl+F 查詢Tomcat的jar包

紅框標註的為tomcat的核心包

  • 第四步,/mute all,帶上耳機,打上斷點

如果你要測試Tomcat的NIO處理方式,在以下類打斷點 (如果你想要了解tomcat中NIO的處理方式,可以看看我的理解)

package org.apache.tomcat.util.net;
public class NioEndpoint extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
...
public class Poller implements Runnable {
    public void run() {
        //此方法的程式碼位於692行
    }
}
...
}
複製程式碼

如果你要測試Tomcat的NIO2的處理方式,則需要以下配置 將以下程式碼新增到你的程式碼中。(由於SpringBoot中內嵌的tomcat預設I/O方式為NIO所以我們需要通過配置增加NIO2的聯結器)

import org.apache.catalina.connector.Connector;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConnectorConf {
    //注意你的SpringBoot版本,此專案的版本是2.2.0,舊的版本1.5使用不同的類進行配置
    @Bean
    public TomcatServletWebServerFactory servletContainer() {
        TomcatServletWebServerFactory tomcatServletWebServerFactory =
                new TomcatServletWebServerFactory();
        tomcatServletWebServerFactory.addAdditionalTomcatConnectors(getConnector());
        return tomcatServletWebServerFactory;
    }

    private Connector getConnector() {
       // 關鍵點哦
        Connector connector = new Connector("org.apache.coyote.http11.Http11Nio2Protocol");
        //將聯結器的埠設定為801,這樣訪問801埠的就是NIO2的模式了
        connector.setPort(801);
        return connector;
    }
}
複製程式碼

org.apache.tomcat.util.net.Nio2Endpoint打上斷點就完事了

如何除錯多執行緒

在多執行緒的情況下,可能會出現進入不了斷點的情況,此時只需在斷點上右鍵選擇Thread即可,當其他執行緒到達斷點時IDEA法發出通知,如下圖所示