1. 程式人生 > 實用技巧 >NIO之路--MINA框架原始碼解析

NIO之路--MINA框架原始碼解析

MINA框架是基於NIO的非同步IO框架,上一文已經對MINA的理論及實踐做了分析,本文將對於MINA的整體原始碼實現進行分析。

通過MINA的實際案例可以發現,MINA的IO實現相比於NIO的使用要簡單很多,因為不需要關心IO的具體實現,只需要關心具體的IO資料即可。MINA服務端整體步驟一共就四步:

1、建立IoService:初始化IoService,服務端就是建立IoAcceptor物件,客戶端就是建立IoConnector物件

2、新增IoFilter:新增IO過濾器,每個IoService內部都有一個IO過濾器鏈IoFIlterChain,呼叫addBefore或addLast等方法將IO過濾器新增到過濾器鏈上

3、設定IoHandler:給IoService設定IO資料處理器IoHandler物件,用來處理器具體的IO業務資料

4、繫結監聽埠號:呼叫IoService的bind方法監聽服務端需要監聽的埠號,並開啟Selector監聽客戶端連線

一、初始化IoService

伺服器建立IoService是直接建立IoService的子介面IoAcceptor,實現類為NioSocketAcceptor,例項化程式碼如下:

1     /** 建立預設例項*/
2     IoAcceptor acceptor1 = new NioSocketAcceptor();
3 
4     /** 建立指定數量IoProcessor的例項
*/ 5 IoAcceptor acceptor2 = new NioSocketAcceptor(2); 6 7 /** 建立指定執行緒池的例項*/ 8 Executor executor = Executors.newFixedThreadPool(4); 9 IoAcceptor acceptor3 = new NioSocketAcceptor(executor, new SimpleIoProcessorPool<>(NioProcessor.class));

雖然NioSocketAcceptor是實現了IoAcceptor介面,但是並不是直接實現的,而是繼承了抽象的實現了IoAcceptor介面的父類AbstractPollingIoAcceptor,按預設的構造方法為例,初始化程式碼如下:

  1 /** 無參建構函式*/
  2     public NioSocketAcceptor() {
  3         /** 呼叫父類建構函式*/
  4         super(new DefaultSocketSessionConfig(), NioProcessor.class);
  5         ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
  6     }
  7 
  8     /** AbstractPollingIoAcceptor建構函式
  9      * @param sessionConfig:IoSession的全域性配置
 10      * @param processorClass:IoProcessor的Class
 11      * */
 12     protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) {
 13         /** 呼叫過載建構函式*/
 14         this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null);
 15     }
 16 
 17     /** AbstractPollingIoAcceptor建構函式
 18      * @param sessionConfig:IoSession的全域性配置
 19      * @param executor:執行緒池
 20      * @param processor:IoProcessor物件
 21      * @param createdProcessor : 是否建立Processor
 22      * @param selectorProvider : SelectorProvider物件,用於建立Selector
 23      * */
 24     private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor,
 25                                       boolean createdProcessor, SelectorProvider selectorProvider) {
 26         /** 1.呼叫父親AbstractIoAcceptor的建構函式*/
 27         super(sessionConfig, executor);
 28 
 29         if (processor == null) {
 30             throw new IllegalArgumentException("processor");
 31         }
 32         /** 2.設定屬性processor、createdProcessor值*/
 33         this.processor = processor;
 34         this.createdProcessor = createdProcessor;
 35 
 36         try {
 37             /** 3.根據SelectorProvider物件初始化Selector,和NIO的根據SelectorProvider獲取Selector物件邏輯一樣 */
 38             init(selectorProvider);
 39 
 40             //標記當前IoAcceptor的Selector已經建立完成,可以接收客戶端的連線請求了
 41             selectable = true;
 42         } catch (RuntimeException e) {
 43             throw e;
 44         } catch (Exception e) {
 45             throw new RuntimeIoException("Failed to initialize.", e);
 46         } finally {
 47             if (!selectable) {
 48                 try {
 49                     destroy();
 50                 } catch (Exception e) {
 51                     ExceptionMonitor.getInstance().exceptionCaught(e);
 52                 }
 53             }
 54         }
 55     }
 56 
 57     /** AbstractIoAcceptor建構函式*/
 58     protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {
 59         /** 呼叫父類AbstractIoService建構函式 */
 60         super(sessionConfig, executor);
 61         defaultLocalAddresses.add(null);
 62     }
 63 
 64     /** AbstractIoService建構函式 */
 65     protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {
 66         /** 引數校驗*/
 67         if (sessionConfig == null) {
 68             throw new IllegalArgumentException("sessionConfig");
 69         }
 70 
 71         if (getTransportMetadata() == null) {
 72             throw new IllegalArgumentException("TransportMetadata");
 73         }
 74 
 75         if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) {
 76             throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: "
 77                     + getTransportMetadata().getSessionConfigType() + ")");
 78         }
 79 
 80         /** 建立IoServiceListenerSupport物件,IoServiceListenerSupport內部有一個IoServiceListener的列表
 81          *  將當前的IoServerListener物件新增到IoServiceListenerSupport的列表中
 82          *  */
 83         listeners = new IoServiceListenerSupport(this);
 84         listeners.add(serviceActivationListener);
 85 
 86         /** 設定屬性 sessionConfig*/
 87         this.sessionConfig = sessionConfig;
 88 
 89         //載入異常監聽器ExceptionMonitor物件,提前載入防止在使用的時候還沒有初始化
 90         ExceptionMonitor.getInstance();
 91         /** 設定執行緒池,如果沒有自定義,就採用預設的newCachedThreadPool*/
 92         if (executor == null) {
 93             this.executor = Executors.newCachedThreadPool();
 94             createdExecutor = true;
 95         } else {
 96             this.executor = executor;
 97             createdExecutor = false;
 98         }
 99 
100         threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
101     }

整個的初始化過程都是在給IoAcceptor的一些屬性進行初始化,核心屬性包括sessionConfig、executor、IoProcessor等

其中SessionConfig表示IoSession的全域性屬性配置,主要配置如下:

  1 public interface IoSessionConfig {
  2 
  3         /**
  4          * 獲取IoProcessor讀取資料的緩衝區大小
  5          */
  6         int getReadBufferSize();
  7 
  8         /**
  9          * 設定IoProcessor讀取資料的緩衝區大小,通常會由IoProcessor自動動態調整
 10          */
 11         void setReadBufferSize(int readBufferSize);
 12 
 13         /**
 14          * 獲取IoProcessor讀取屬性的緩衝區大小的最小值
 15          */
 16         int getMinReadBufferSize();
 17 
 18         /**
 19          * 設定IoProcessor讀取屬性的緩衝區大小的最小值
 20          */
 21         void setMinReadBufferSize(int minReadBufferSize);
 22 
 23         /**
 24          * 獲取IoProcessor讀取屬性的緩衝區大小的最大值
 25          */
 26         int getMaxReadBufferSize();
 27 
 28         /**
 29          * 設定IoProcessor讀取屬性的緩衝區大小的最大值
 30          */
 31         void setMaxReadBufferSize(int maxReadBufferSize);
 32 
 33         /**
 34          * 獲取吞吐量(TPS)的統計間隔,單位為秒,預設是3秒
 35          */
 36         int getThroughputCalculationInterval();
 37 
 38         /**
 39          * 獲取吞吐量(TPS)的統計間隔,單位為毫秒
 40          */
 41         long getThroughputCalculationIntervalInMillis();
 42 
 43         /**
 44          * 設定吞吐量的統計間隔時間
 45          */
 46         void setThroughputCalculationInterval(int throughputCalculationInterval);
 47 
 48         /**
 49          * 獲取指定狀態的空閒時間,不同狀態時間可能設定的不一樣, 型別如下:
 50          * READER_IDLE : 讀資料空閒
 51          * WRITER_IDLE : 寫資料空閒
 52          * BOTH_IDLE : 讀寫都空閒
 53          */
 54         int getIdleTime(IdleStatus status);
 55 
 56         /**
 57          * 獲取指定狀態的空閒時間,單位為毫秒
 58          */
 59         long getIdleTimeInMillis(IdleStatus status);
 60 
 61         /**
 62          * 設定指定狀態的空閒時間
 63          */
 64         void setIdleTime(IdleStatus status, int idleTime);
 65 
 66         /**
 67          * 獲取讀空閒時間,單位秒
 68          */
 69         int getReaderIdleTime();
 70 
 71         /**
 72          * 獲取讀空閒時間,單位毫秒
 73          */
 74         long getReaderIdleTimeInMillis();
 75 
 76         /**
 77          * 設定讀空閒時間
 78          */
 79         void setReaderIdleTime(int idleTime);
 80 
 81         /**
 82          * 獲取寫空閒時間,單位秒
 83          */
 84         int getWriterIdleTime();
 85 
 86         /**
 87          * 獲取寫空閒時間,單位毫秒
 88          */
 89         long getWriterIdleTimeInMillis();
 90 
 91         /**
 92          * 設定寫空閒時間
 93          */
 94         void setWriterIdleTime(int idleTime);
 95 
 96         /**
 97          * 獲取讀寫都空閒時間,單位秒
 98          */
 99         int getBothIdleTime();
100 
101         /**
102          * 獲取讀寫都空閒時間,單位毫秒
103          */
104         long getBothIdleTimeInMillis();
105 
106         /**
107          * 設定讀寫都空閒時間
108          */
109         void setBothIdleTime(int idleTime);
110 
111         /**
112          * 獲取寫超時時間,單位為秒
113          */
114         int getWriteTimeout();
115 
116         /**
117          * 獲取寫超時時間,單位毫秒
118          */
119         long getWriteTimeoutInMillis();
120 
121         /**
122          * 設定寫超時時間
123          */
124         void setWriteTimeout(int writeTimeout);
125 
126         /**
127          * 獲取會話讀操作是否開啟
128          */
129         boolean isUseReadOperation();
130 
131         /**
132          * 開啟或關閉會話讀操作,如果開啟所有接收到的訊息會儲存在記憶體的BlockingQueue中,使客戶端應用可以更方便讀取接收的訊息
133          * 開啟這個選項對伺服器應用無效,並可能會導致記憶體洩漏,預設為關閉狀態
134          */
135         void setUseReadOperation(boolean useReadOperation);
136 
137         /** 全量設定為IoSessionConfig */
138         void setAll(IoSessionConfig config);
139     }

另外一個屬性是執行緒池物件executor,如果沒有自定義執行緒池傳入的話,那麼預設會呼叫Executors.newCachedThreadPool()建立無執行緒數上限的執行緒池,所以推薦使用自定義的執行緒池,避免預設的執行緒池沒有執行緒數量限制導致執行緒過多的問題。

還有一個屬性是IoProcessor池,預設類為SimpleIoProcessorPool,SimpleIoProcessorPool內部有一個IoProcessor[] pool和一個Executor executor屬性,pool是IoProcessor陣列,儲存所有的IoProcessor,數量可以自定義通過建構函式傳入,預設的個數為當前伺服器機器的CPU個數+1,比如4核CPU那麼就會建立5個IoProcessor,另外每一個IoProcessor初始化的時候都會設定執行緒池executor屬性,如果executor沒有自定義同樣也會使用Executors.newCachedThreadPool建立。

總結:

IoService初始化的時候涉及到了兩個執行緒池,首先是IoService本身使用的執行緒池,IoService本身用於接收客戶端的連線請求,而連線請求的處理就交給了執行緒池處理;

另外IoService初始化時會建立多個IoProcessor用於處理客戶端具體的IO操作,每一個IoProcessor內部有一個Selector用於監聽客戶端IO事件,然後將IO事件交給內部的執行緒池來處理

二、新增IoFilter

IoService內部都一個過濾器鏈IoFilterChainBuilder物件,預設實現類為DefaultIoFilterChainBuilder類,新增IoFilter時主要有四個方法,分別是在過濾器鏈的不同位置插入指定過濾器,用法分別如下:

 1         IoAcceptor acceptor = new NioSocketAcceptor();
 2         /** 獲取過濾器鏈 */
 3         DefaultIoFilterChainBuilder ioFilterChain = acceptor.getFilterChain();
 4 
 5         /** 1.在過濾器鏈的首部加入過濾器 */
 6         ioFilterChain.addFirst("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
 7         /** 2.在過濾器鏈的指定過濾器前面加入過濾器 */
 8         ioFilterChain.addBefore("logFilter","codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
 9         /** 3.在過濾器鏈的指定過濾器後面加入過濾器 */
10         ioFilterChain.addAfter("logFilter","codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
11         /** 4.在過濾器鏈的尾部加入過濾器 */
12         ioFilterChain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));

以addFirst為例,實現邏輯如下:

 1 /** 在過濾器鏈首部插入過濾器
 2      * @param name:過濾器的名稱
 3      * @param filter: 過濾器
 4      * */
 5     public synchronized void addFirst(String name, IoFilter filter) {
 6         /** 構造過濾器鏈路節點物件EntryImpl,並呼叫register方法加入到列表中 */
 7         register(0, new EntryImpl(name, filter));
 8     }
 9 
10 
11     private final List<Entry> entries;
12 
13     public DefaultIoFilterChainBuilder() {
14         entries = new CopyOnWriteArrayList<Entry>();
15     }
16 
17     /**
18      * 在指定位置插入Entry節點
19      * */
20     private void register(int index, Entry e) {
21         if (contains(e.getName())) {
22             throw new IllegalArgumentException("Other filter is using the same name: " + e.getName());
23         }
24         //呼叫List的add方法在指定為止插入節點
25         entries.add(index, e);
26     }

從原始碼可以看出新增IoFilter的邏輯比較簡單,過濾器鏈IoFilterChainBuilder物件內部有一個列表,用於存放所有的IoFilter,新增過濾器就是將IoFilter和名稱封裝成Entry物件新增到列表中,不同的add方法實際就是呼叫List的add(int index, Entry e)方法在指定的位置插入元素。addFirst就是在list頭部插入元素,addLast就是在list尾部插入元素,addBefore和addAfter先通過遍歷查詢指定過濾器在List中的位置,然後再將新插入的過濾器插入到指定位置

三、設定IoHandler

 1 public final void setHandler(IoHandler handler) {
 2         if (handler == null) {
 3             throw new IllegalArgumentException("handler cannot be null");
 4         }
 5         /** 判斷IoService是否活躍,主要是判斷IoServiceListenerSupport是否有活躍的IoServiceListener */
 6         if (isActive()) {
 7             throw new IllegalStateException("handler cannot be set while the service is active.");
 8         }
 9         /** 設定IoHandler屬性*/
10         this.handler = handler;
11     }

設定IoHandler的邏輯比較簡單,就是給IoService內部的handler屬性賦值

四、繫結主機

前面三個步驟都是在初始化伺服器並設定各種屬性,接下來就是繫結監聽的邏輯,原始碼如下:

 1 public final void bind(SocketAddress localAddress) throws IOException {
 2         if (localAddress == null) {
 3             throw new IllegalArgumentException("localAddress");
 4         }
 5         /** 建立地址列表,將傳入的SocketAddress新增到列表中*/
 6         List<SocketAddress> localAddresses = new ArrayList<>(1);
 7         localAddresses.add(localAddress);
 8         /** 內部方法執行繫結邏輯*/
 9         bind(localAddresses);
10     }

呼叫內部過載的bind方法,原始碼如下:

 1 public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException {
 2         /** 引數校驗*/
 3         if (isDisposing()) {
 4             throw new IllegalStateException("The Accpetor disposed is being disposed.");
 5         }
 6 
 7         if (localAddresses == null) {
 8             throw new IllegalArgumentException("localAddresses");
 9         }
10 
11         List<SocketAddress> localAddressesCopy = new ArrayList<>();
12 
13         for (SocketAddress a : localAddresses) {
14             checkAddressType(a);
15             localAddressesCopy.add(a);
16         }
17 
18         if (localAddressesCopy.isEmpty()) {
19             throw new IllegalArgumentException("localAddresses is empty.");
20         }
21 
22         boolean activate = false;
23         synchronized (bindLock) {
24             synchronized (boundAddresses) {
25                 if (boundAddresses.isEmpty()) {
26                     activate = true;
27                 }
28             }
29             /** 判斷IoHandler是否為空*/
30             if (getHandler() == null) {
31                 throw new IllegalStateException("handler is not set.");
32             }
33 
34             try {
35                 /**  繫結主機地址 */
36                 Set<SocketAddress> addresses = bindInternal(localAddressesCopy);
37                 synchronized (boundAddresses) {
38                     boundAddresses.addAll(addresses);
39                 }
40             } catch (IOException e) {
41                 throw e;
42             } catch (RuntimeException e) {
43                 throw e;
44             } catch (Exception e) {
45                 throw new RuntimeIoException("Failed to bind to: " + getLocalAddresses(), e);
46             }
47         }
48         /** 啟用IoServiceListener */
49         if (activate) {
50             getListeners().fireServiceActivated();
51         }
52     }

雖然程式碼比較多,但是核心程式碼就只有 bindInternal這一行, bindInternal是真正的繫結主機的方法,程式碼如下:

 1 /** 繫結主機*/
 2     protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
 3         /** 建立一個Future物件,當IoAcceptor的Selector處理了該請求後會給Future物件傳送一個訊號 */
 4         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
 5 
 6         /** 新增到AcceptorOperationFuture佇列中*/
 7         registerQueue.add(request);
 8 
 9         /** 開啟Acceptor*/
10         startupAcceptor();
11 
12         try {
13             /** 訊號量設定為1,相當於加鎖處理*/
14             lock.acquire();
15 
16             wakeup();
17         } finally {
18             /** 訊號量釋放, 相當於解鎖處理*/
19             lock.release();
20         }
21 
22         // Now, we wait until this request is completed.
23         request.awaitUninterruptibly();
24 
25         if (request.getException() != null) {
26             throw request.getException();
27         }
28         Set<SocketAddress> newLocalAddresses = new HashSet<>();
29 
30         for (H handle : boundHandles.values()) {
31             newLocalAddresses.add(localAddress(handle));
32         }
33 
34         return newLocalAddresses;
35     }

核心邏輯是呼叫了startupAcceptor方法,程式碼如下:

 1 private void startupAcceptor() throws InterruptedException {
 2         if (!selectable) {
 3             registerQueue.clear();
 4             cancelQueue.clear();
 5         }
 6 
 7         /** 建立Acceptor物件, Acceptor實現了Runnable介面*/
 8         Acceptor acceptor = acceptorRef.get();
 9 
10         if (acceptor == null) {
11             lock.acquire();
12             acceptor = new Acceptor();
13 
14             if (acceptorRef.compareAndSet(null, acceptor)) {
15                 /** 將實現了Runnable介面的acceptor物件放入執行緒池中執行*/
16                 executeWorker(acceptor);
17             } else {
18                 lock.release();
19             }
20         }
21     }

 1 protected final void executeWorker(Runnable worker) {
 2         executeWorker(worker, null);
 3     }
 4 
 5     protected final void executeWorker(Runnable worker, String suffix) {
 6         String actualThreadName = threadName;
 7         if (suffix != null) {
 8             actualThreadName = actualThreadName + '-' + suffix;
 9         }
10         executor.execute(new NamePreservingRunnable(worker, actualThreadName));
11     }

該方法的核心邏輯是建立一個Acceptor物件,Acceptor物件是實現了Runnable介面的,所以是一個可執行邏輯。然後呼叫executeWorker方法將Acceptor物件交給執行緒池執行,而執行緒池就是初始化IoService時初始化的執行緒池,預設是Executors.newCachedThreadPool

既然Acceptor實現了Runnable介面,那麼就再看下Acceptor的具體實現邏輯:

  1 private class Acceptor implements Runnable {
  2         public void run() {
  3             assert (acceptorRef.get() == this);
  4 
  5             int nHandles = 0;
  6 
  7             /** 釋放鎖*/
  8             lock.release();
  9 
 10             while (selectable) {
 11                 try {
 12                     /** 獲取註冊的監聽埠的數量*/
 13                     nHandles += registerHandles();
 14 
 15                     /** 呼叫Selector的select()方法,會阻塞當前執行緒 */
 16                     int selected = select();
 17 
 18                     // Now, if the number of registred handles is 0, we can
 19                     // quit the loop: we don't have any socket listening
 20                     // for incoming connection.
 21                     if (nHandles == 0) {
 22                         acceptorRef.set(null);
 23 
 24                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
 25                             assert (acceptorRef.get() != this);
 26                             break;
 27                         }
 28 
 29                         if (!acceptorRef.compareAndSet(null, this)) {
 30                             assert (acceptorRef.get() != this);
 31                             break;
 32                         }
 33 
 34                         assert (acceptorRef.get() == this);
 35                     }
 36 
 37                     if (selected > 0) {
 38                         /** 處理Selector返回的所有的SelectionKey */
 39                         processHandles(selectedHandles());
 40                     }
 41 
 42                     // check to see if any cancellation request has been made.
 43                     nHandles -= unregisterHandles();
 44                 } catch (ClosedSelectorException cse) {
 45                     // If the selector has been closed, we can exit the loop
 46                     ExceptionMonitor.getInstance().exceptionCaught(cse);
 47                     break;
 48                 } catch (Exception e) {
 49                     ExceptionMonitor.getInstance().exceptionCaught(e);
 50 
 51                     try {
 52                         Thread.sleep(1000);
 53                     } catch (InterruptedException e1) {
 54                         ExceptionMonitor.getInstance().exceptionCaught(e1);
 55                     }
 56                 }
 57             }
 58 
 59             /** 當銷燬IoService時, 銷燬所有的processor*/
 60             if (selectable && isDisposing()) {
 61                 selectable = false;
 62                 try {
 63                     if (createdProcessor) {
 64                         processor.dispose();
 65                     }
 66                 } finally {
 67                     try {
 68                         synchronized (disposalLock) {
 69                             if (isDisposing()) {
 70                                 destroy();
 71                             }
 72                         }
 73                     } catch (Exception e) {
 74                         ExceptionMonitor.getInstance().exceptionCaught(e);
 75                     } finally {
 76                         disposalFuture.setDone();
 77                     }
 78                 }
 79             }
 80         }
 81 
 82         /**
 83          * This method will process new sessions for the Worker class.  All
 84          * keys that have had their status updates as per the Selector.selectedKeys()
 85          * method will be processed here.  Only keys that are ready to accept
 86          * connections are handled here.
 87          * <p/>
 88          * Session objects are created by making new instances of SocketSessionImpl
 89          * and passing the session object to the SocketIoProcessor class.
 90          */
 91         @SuppressWarnings("unchecked")
 92         private void processHandles(Iterator<H> handles) throws Exception {
 93             /** 遍歷所有SelectionKey*/
 94             while (handles.hasNext()) {
 95                 H handle = handles.next();
 96                 handles.remove();
 97 
 98                 /** 處理客戶端連線請求,封裝成NioSocketSession物件*/
 99                 S session = accept(processor, handle);
100 
101                 if (session == null) {
102                     continue;
103                 }
104                 /** 初始化客戶端Session物件, 設定上一次讀寫時間 */
105                 initSession(session, null, null);
106 
107                 /** 將客戶端Session新增到IoProcessor執行緒池,並分配一個IoProcessor和Session進行繫結 */
108                 session.getProcessor().add(session);
109             }
110         }
111     }
112 
113     protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
114 
115         SelectionKey key = null;
116 
117         if (handle != null) {
118             key = handle.keyFor(selector);
119         }
120         /** key有效且必須觸發了OP_ACCEPT事件,其他事件不處理*/
121         if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
122             return null;
123         }
124 
125         /** 獲取客戶端連線的Channel*/
126         SocketChannel ch = handle.accept();
127 
128         if (ch == null) {
129             return null;
130         }
131         /** 封裝客戶端連線為NioSocketSession物件 */
132         return new NioSocketSession(this, processor, ch);
133     }
134 
135     public NioSocketSession(IoService service, IoProcessor<NioSession> processor, SocketChannel channel) {
136         /** 呼叫父類建構函式 設定IoProcessor、IoService、SocketChannel屬性*/
137         super(processor, service, channel);
138         config = new SessionConfigImpl();
139         /** 將IoService的全域性SessionConfig複製給當前session的配置*/
140         config.setAll(service.getSessionConfig());
141     }

從程式碼中可以看出IoAcceptor的run方法基本上和NIO的伺服器啟動方法差不多,首先了呼叫Selector的select()方法獲取所有客戶端的請求連線accept事件,然後遍歷所有的SelectionKey,將客戶端的連線封裝成NioSocketSession,最後再將NioSocketSession新增到IoProcessor執行緒池中,而繫結到具體的IoProcessor的邏輯是在IoProcessor的add方法中實現的,這裡IoProcessor執行緒池是通過SimpleIoProcessorPool類實現的,原始碼如下:

 1 /** 新增SocketSession*/
 2     public final void add(S session) {
 3         /** 根據session獲取具體的IoProcessor,並將session新增到IoProcessor中*/
 4         getProcessor(session).add(session);
 5     }
 6 
 7     private IoProcessor<S> getProcessor(S session) {
 8         IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR);
 9 
10         if (processor == null) {
11             if (disposed || disposing) {
12                 throw new IllegalStateException("A disposed processor cannot be accessed.");
13             }
14             /** 根據session的ID通過取模演算法從陣列中獲取對應的IoProcessor*/
15             processor = pool[Math.abs((int) session.getId()) % pool.length];
16 
17             if (processor == null) {
18                 throw new IllegalStateException("A disposed processor cannot be accessed.");
19             }
20             /** 給session設定IoProcessor屬性*/
21             session.setAttributeIfAbsent(PROCESSOR, processor);
22         }
23 
24         return processor;
25     }

每一個SocketSession會繫結一個IoProcessor,並會快取在session的attribute中,而獲取IoProcessor的方式則是通過取模演算法從IoProcessor陣列中獲取。

然後呼叫IoProcessor的add方法將session新增到IoProcessor中,處理IO資料的IoProcessor實現類為NioProcessor,實現的add方法原始碼如下:

 1 /** 新建立的session佇列*/
 2     private final Queue<S> newSessions = new ConcurrentLinkedQueue<>();
 3 
 4     /** 需要移除的session佇列 */
 5     private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>();
 6 
 7     /** 需要重新整理的session佇列*/
 8     private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>();
 9 
10     public final void add(S session) {
11         if (disposed || disposing) {
12             throw new IllegalStateException("Already disposed.");
13         }
14 
15         /** 將新的session加入到佇列中*/
16         newSessions.add(session);
17         /** 開啟IoProcessor*/
18         startupProcessor();
19     }
20 
21     private void startupProcessor() {
22         /** 原子引用型別,如果不存在就新建Processor*/
23         Processor processor = processorRef.get();
24 
25         if (processor == null) {
26             processor = new Processor();
27 
28             if (processorRef.compareAndSet(null, processor)) {
29                 /** 將可執行的Processor新增到執行緒池中執行 */
30                 executor.execute(new NamePreservingRunnable(processor, threadName));
31             }
32         }
33 
34         /** 喚醒Selector, 防止一直被阻塞*/
35         wakeup();
36     }

這裡將session新增到佇列中,然後開啟IoProcessor執行緒,過程和IoSession型別,都是封裝成一個NamePreservingRunnable物件交給執行緒池去執行,而這裡的執行緒池和IoService的執行緒池是獨立的,沒給IoProcessor都有一個自己的執行緒池

Processor具體的執行邏輯如下:

  1 /** Processor執行緒,用於處理IO讀寫操作*/
  2     private class Processor implements Runnable {
  3         public void run() {
  4             assert (processorRef.get() == this);
  5             //會話數量
  6             int nSessions = 0;
  7             lastIdleCheckTime = System.currentTimeMillis();
  8             /** 重試次數*/
  9             int nbTries = 10;
 10 
 11             for (;;) {
 12                 try {
 13                     long t0 = System.currentTimeMillis();
 14                     int selected = select(SELECT_TIMEOUT);
 15                     long t1 = System.currentTimeMillis();
 16                     /** 計算select()方法執行的時間*/
 17                     long delta = t1 - t0;
 18 
 19                     if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) {
 20                         // Last chance : the select() may have been
 21                         // interrupted because we have had an closed channel.
 22                         if (isBrokenConnection()) {
 23                             LOG.warn("Broken connection");
 24                         } else {
 25                             // Ok, we are hit by the nasty epoll
 26                             // spinning.
 27                             // Basically, there is a race condition
 28                             // which causes a closing file descriptor not to be
 29                             // considered as available as a selected channel,
 30                             // but
 31                             // it stopped the select. The next time we will
 32                             // call select(), it will exit immediately for the
 33                             // same
 34                             // reason, and do so forever, consuming 100%
 35                             // CPU.
 36                             // We have to destroy the selector, and
 37                             // register all the socket on a new one.
 38                             if (nbTries == 0) {
 39                                 LOG.warn("Create a new selector. Selected is 0, delta = " + delta);
 40                                 registerNewSelector();
 41                                 nbTries = 10;
 42                             } else {
 43                                 nbTries--;
 44                             }
 45                         }
 46                     } else {
 47                         nbTries = 10;
 48                     }
 49 
 50                     /** 處理新新增的Session
 51                      *  將session對應的channel註冊到當前IoProcessor的Selector上,並監聽OP_READ事件
 52                      * */
 53                     nSessions += handleNewSessions();
 54 
 55                     updateTrafficMask();
 56 
 57                     // Now, if we have had some incoming or outgoing events,
 58                     // deal with them
 59                     if (selected > 0) {
 60                         /** 當select()返回值大於0表示當前已經有IO事件需要處理,則執行process方法進行處理*/
 61                         process();
 62                     }
 63 
 64                     // Write the pending requests
 65                     long currentTime = System.currentTimeMillis();
 66                     flush(currentTime);
 67 
 68                     /**
 69                      * 移除所有需要移除的session,移除之前會將所有需要傳送的資料傳送給客戶端
 70                      * */
 71                     nSessions -= removeSessions();
 72 
 73                     /** 更新所有session的讀空閒、寫空閒和讀寫空閒的時間值 */
 74                     notifyIdleSessions(currentTime);
 75 
 76                     // Get a chance to exit the infinite loop if there are no
 77                     // more sessions on this Processor
 78                     if (nSessions == 0) {
 79                         processorRef.set(null);
 80 
 81                         if (newSessions.isEmpty() && isSelectorEmpty()) {
 82                             // newSessions.add() precedes startupProcessor
 83                             assert (processorRef.get() != this);
 84                             break;
 85                         }
 86 
 87                         assert (processorRef.get() != this);
 88 
 89                         if (!processorRef.compareAndSet(null, this)) {
 90                             // startupProcessor won race, so must exit processor
 91                             assert (processorRef.get() != this);
 92                             break;
 93                         }
 94 
 95                         assert (processorRef.get() == this);
 96                     }
 97 
 98                     /** 當IoProcessor銷燬了,則移除所有的客戶端的SocketSession*/
 99                     if (isDisposing()) {
100                         boolean hasKeys = false;
101 
102                         for (Iterator<S> i = allSessions(); i.hasNext();) {
103                             IoSession session = i.next();
104 
105                             if (session.isActive()) {
106                                 scheduleRemove((S)session);
107                                 hasKeys = true;
108                             }
109                         }
110 
111                         if (hasKeys) {
112                             wakeup();
113                         }
114                     }
115                 } catch (ClosedSelectorException cse) {
116                     // If the selector has been closed, we can exit the loop
117                     // But first, dump a stack trace
118                     ExceptionMonitor.getInstance().exceptionCaught(cse);
119                     break;
120                 } catch (Exception e) {
121                     ExceptionMonitor.getInstance().exceptionCaught(e);
122 
123                     try {
124                         Thread.sleep(1000);
125                     } catch (InterruptedException e1) {
126                         ExceptionMonitor.getInstance().exceptionCaught(e1);
127                     }
128                 }
129             }
130 
131             try {
132                 synchronized (disposalLock) {
133                     if (disposing) {
134                         doDispose();
135                     }
136                 }
137             } catch (Exception e) {
138                 ExceptionMonitor.getInstance().exceptionCaught(e);
139             } finally {
140                 disposalFuture.setValue(true);
141             }
142         }
143     }

這裡程式碼比較多,但是核心的不多,主要功能如下:

1、處理所有新加入的Session,將session對應的channel註冊到Selector中,並監聽OP_READ事件

2、處理所有的需要移除的session,將session從Selector中移除監聽

3、呼叫Selector的select()方法獲取IO事件,如果存在IO事件,則呼叫process()方法進行處理

4、更新所有session的讀寫空閒時間

而處理IO事件的邏輯全部在process方法中處理,原始碼如下:

 1 /** 處理IoProcessor的Selector監聽返回的所有IO事件 */
 2     private void process() throws Exception {
 3         /** 遍歷所有的SelectionKey*/
 4         for (Iterator<S> i = selectedSessions(); i.hasNext();) {
 5             S session = i.next();
 6             /** 處理單個客戶端session的IO事件*/
 7             process(session);
 8             i.remove();
 9         }
10     }
11     /** 獲取所有的SelectionKey*/
12     protected Iterator<NioSession> selectedSessions() {
13         return new IoSessionIterator(selector.selectedKeys());
14     }

process方法的邏輯不多,先是獲取selector所有的IO事件SelectionKey集合,然後進行遍歷所有的SelectionKey集合,呼叫process(session)方法處理每個Session的IO事件,原始碼如下:

 1 /***/
 2     private void process(S session) {
 3         /** 處理讀事件 */
 4         if (isReadable(session) && !session.isReadSuspended()) {
 5             /** 處理session的讀事件*/
 6             read(session);
 7         }
 8 
 9         /** 處理寫事件 */
10         if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) {
11             /** 重新整理session */
12             flushingSessions.add(session);
13         }
14     }
15 
16     private void read(S session) {
17         /** 獲取配置的讀緩衝區大小*/
18         IoSessionConfig config = session.getConfig();
19         int bufferSize = config.getReadBufferSize();
20         /** 分配指定大小的緩衝區*/
21         IoBuffer buf = IoBuffer.allocate(bufferSize);
22 
23         final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation();
24 
25         try {
26             int readBytes = 0;
27             int ret;
28 
29             try {
30                 if (hasFragmentation) {
31                     /** 呼叫read方法將session中的IO資料讀取到緩衝區*/
32                     while ((ret = read(session, buf)) > 0) {
33                         readBytes += ret;
34 
35                         if (!buf.hasRemaining()) {
36                             break;
37                         }
38                     }
39                 } else {
40                     /** 呼叫read方法將session中的IO資料讀取到緩衝區*/
41                     ret = read(session, buf);
42                     if (ret > 0) {
43                         readBytes = ret;
44                     }
45                 }
46             } finally {
47                 /** 從讀模式切換到寫模式 */
48                 buf.flip();
49             }
50 
51             if (readBytes > 0) {
52                 IoFilterChain filterChain = session.getFilterChain();
53                 /** 將讀到的資料交給過濾器鏈進行過濾處理 */
54                 filterChain.fireMessageReceived(buf);
55                 buf = null;
56 
57                 if (hasFragmentation) {
58                     /** 判斷讀取到的資料是否小於緩衝區總大小的一半*/
59                     if (readBytes << 1 < config.getReadBufferSize()) {
60                         /** 縮小緩衝區大小 */
61                         session.decreaseReadBufferSize();
62                     } else if (readBytes == config.getReadBufferSize()) {
63                         /** 擴大緩衝區大小 */
64                         session.increaseReadBufferSize();
65                     }
66                 }
67             }
68 
69             if (ret < 0) {
70                 /** 過濾器鏈關閉 */
71                 IoFilterChain filterChain = session.getFilterChain();
72                 filterChain.fireInputClosed();
73             }
74         } catch (Exception e) {
75             if (e instanceof IOException) {
76                 if (!(e instanceof PortUnreachableException)
77                         || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())
78                         || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {
79                     scheduleRemove(session);
80                 }
81             }
82             /** 如果拋異常,則通知過濾器鏈執行異常事件*/
83             IoFilterChain filterChain = session.getFilterChain();
84             filterChain.fireExceptionCaught(e);
85         }
86     }

這裡邏輯比較多,但是整體邏輯不復雜,主要邏輯如下:

1、如果是可讀事件,那麼就執行read方法進行IO資料的讀取

2、根據配置的讀緩衝區大小建立IoBuffer物件進行記憶體分配

3、呼叫read方法從session中讀取資料存到IoBuffer中,讀取完成將IoBuffer從讀模式切換到寫模式

4、將讀取到的IoBuffer資料交給過濾器鏈進行所有IO過濾器進行過濾處理

5、將緩衝區大小進行擴容或降容處理

6、將連線關閉或連線異常的事件交給過濾器鏈進行對應的處理

接下來就針對每一個步驟分別進行原始碼分析

1、分配緩衝區

 1 /** 記憶體分配工具 */
 2     private static IoBufferAllocator allocator = new SimpleBufferAllocator();
 3 
 4     /** 是否使用直接記憶體,預設為false表示使用堆內記憶體,值為true表示使用直接記憶體 */
 5     private static boolean useDirectBuffer = false;
 6 
 7     public static IoBuffer allocate(int capacity) {
 8         return allocate(capacity, useDirectBuffer);
 9     }
10 
11     public static IoBuffer allocate(int capacity, boolean useDirectBuffer) {
12         if (capacity < 0) {
13             throw new IllegalArgumentException("capacity: " + capacity);
14         }
15         /** 呼叫記憶體分配工具的allocate方法進行記憶體分配 */
16         return allocator.allocate(capacity, useDirectBuffer);
17     }

分配緩衝區一共有兩個引數,分別是緩衝區的大小和是否使用直接記憶體,最終呼叫記憶體分配器的allocate方法進行記憶體分配,原始碼如下:

 1 /** 建立IoBuffer物件,分配記憶體 */
 2     public IoBuffer allocate(int capacity, boolean direct) {
 3         return wrap(allocateNioBuffer(capacity, direct));
 4     }
 5 
 6     /** 建立NIO的ByteBuffer物件*/
 7     public ByteBuffer allocateNioBuffer(int capacity, boolean direct) {
 8         ByteBuffer nioBuffer;
 9         if (direct) {
10             /** 分配直接記憶體*/
11             nioBuffer = ByteBuffer.allocateDirect(capacity);
12         } else {
13             /** 分配堆內記憶體*/
14             nioBuffer = ByteBuffer.allocate(capacity);
15         }
16         return nioBuffer;
17     }
18 
19     /** 將ByteBuffer物件包裝成IoBuffer物件 */
20     public IoBuffer wrap(ByteBuffer nioBuffer) {
21         return new SimpleBuffer(nioBuffer);
22     }

這裡就回到了Java的NIO分配記憶體的邏輯了,MINA分配緩衝區的邏輯實際底層就是呼叫了Java NIO的ByteBuffer的分配邏輯,根據直接記憶體還是堆內記憶體進行記憶體分配,最終分配了之後再封裝成了IoBuffer物件。

2、IO資料讀取

1 @Override
2     protected int read(NioSession session, IoBuffer buf) throws Exception {
3         /** 從session中獲取channel*/
4         ByteChannel channel = session.getChannel();
5         /** 呼叫Java NIO的channel.read(ByteBuffer buffer)方法讀取資料 */
6         return channel.read(buf.buf());
7     }

讀取資料的邏輯也不復雜,同樣是直接呼叫了Java NIO的channel讀取Buffer資料的方式進行讀取,具體的實現邏輯可以參考Java NIO的channel讀取緩衝區資料的邏輯

3、過濾器鏈對IO資料進行過濾處理

 1 public void fireMessageReceived(Object message) {
 2         if (message instanceof IoBuffer) {
 3             session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis());
 4         }
 5 
 6         callNextMessageReceived(head, session, message);
 7     }
 8 
 9     public final void increaseReadBytes(long increment, long currentTime) {
10         if (increment <= 0) {
11             return;
12         }
13         //統計已讀位元組數
14         readBytes += increment;
15         //設定上一次讀操作時間
16         lastReadTime = currentTime;
17         //設定空閒為0
18         idleCountForBoth.set(0);
19         idleCountForRead.set(0);
20 
21         if (getService() instanceof AbstractIoService) {
22             ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime);
23         }
24     }
25 
26     /** 通知下一個節點處理接收訊息事件 */
27     private void callNextMessageReceived(Entry entry, IoSession session, Object message) {
28         try {
29             IoFilter filter = entry.getFilter();
30             NextFilter nextFilter = entry.getNextFilter();
31             /** 依次執行每一個IoFilter的messageReceived方法 */
32             filter.messageReceived(nextFilter, session, message);
33         } catch (Exception e) {
34             fireExceptionCaught(e);
35         } catch (Error e) {
36             fireExceptionCaught(e);
37             throw e;
38         }
39     }

過濾器鏈路處理訊息時,會從過濾器鏈路的頭節點開始依次執行messageReceived方法,直到執行到最後一個過濾器,過濾器鏈路頭節點和尾節點是固定不變的,頭節點實現為HeadFilter,尾節點實現為TailFilter。

當處理讀事件時,會從HeadFilter開始處理,然後按自定義的過濾器依次執行,最後執行TailFilter的處理;而處理寫事件時,順序完全相反,會從TailFilter開始到HeadFilter結束。所以讀資料時最後會執行TailFilter的messageReceived方法,原始碼如下:

 1  @Override
 2     public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception {
 3         AbstractIoSession s = (AbstractIoSession) session;
 4 
 5         if (!(message instanceof IoBuffer)) {
 6             s.increaseReadMessages(System.currentTimeMillis());
 7         } else if (!((IoBuffer) message).hasRemaining()) {
 8             s.increaseReadMessages(System.currentTimeMillis());
 9         }
10 
11         // Update the statistics
12         if (session.getService() instanceof AbstractIoService) {
13             ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis());
14         }
15 
16         try {
17             /** 尾部過濾器將訊息交給業務處理器IoHandler處理 */
18             session.getHandler().messageReceived(s, message);
19         } finally {
20             if (s.getConfig().isUseReadOperation()) {
21                 s.offerReadFuture(message);
22             }
23         }
24     }

可以看出TailFilter最重要的一個步驟是需要將傳送的IO資料交給業務層處理,通過從IoSession中獲取IoHandler物件,並呼叫IoHandler的messageReceived方法交給業務層處理IO資料

4、緩衝區擴容或降容處理

 1 /** 擴容讀緩衝區大小為原先的兩倍 */
 2     public final void increaseReadBufferSize() {
 3         /** 擴大兩倍*/
 4         int newReadBufferSize = getConfig().getReadBufferSize() << 1;
 5         if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) {
 6             getConfig().setReadBufferSize(newReadBufferSize);
 7         } else {
 8             getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize());
 9         }
10 
11         deferDecreaseReadBuffer = true;
12     }
13 
14     /**
15      * 降容讀緩衝區大小為原先的一半
16      */
17     public final void decreaseReadBufferSize() {
18         if (deferDecreaseReadBuffer) {
19             deferDecreaseReadBuffer = false;
20             return;
21         }
22 
23         if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) {
24             /** 縮小一半*/
25             getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1);
26         }
27 
28         deferDecreaseReadBuffer = true;
29     }

配置的讀緩衝區大小和實際的IO資料大小可能會存在偏差,所以需要動態的調整讀緩衝區的大小,避免緩衝區記憶體不足或記憶體浪費,每次擴容都會擴容到原大小的兩倍,降容也會縮小到原先的一半

5、連線關閉或連線異常事件處理

 1 public void fireInputClosed() {
 2         /** 獲取過濾器鏈頭節點*/
 3         Entry head = this.head;
 4         callNextInputClosed(head, session);
 5     }
 6 
 7     /** 通知下一個節點處理IO輸入關閉事件*/
 8     private void callNextInputClosed(Entry entry, IoSession session) {
 9         try {
10             IoFilter filter = entry.getFilter();
11             /** 獲取下一個過濾器*/
12             NextFilter nextFilter = entry.getNextFilter();
13             /** 處理IO輸入關閉事件*/
14             filter.inputClosed(nextFilter, session);
15         } catch (Throwable e) {
16             fireExceptionCaught(e);
17         }
18     }
19 
20     public void fireExceptionCaught(Throwable cause) {
21         callNextExceptionCaught(head, session, cause);
22     }
23 
24     private void callNextExceptionCaught(Entry entry, IoSession session, Throwable cause) {
25         /** 喚醒關聯的Future*/
26         ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE);
27         if (future == null) {
28             try {
29                 IoFilter filter = entry.getFilter();
30                 NextFilter nextFilter = entry.getNextFilter();
31                 /** 執行過濾的異常處理事件*/
32                 filter.exceptionCaught(nextFilter, session, cause);
33             } catch (Throwable e) {
34                 LOGGER.warn("Unexpected exception from exceptionCaught handler.", e);
35             }
36         } else {
37             /** 關閉session*/
38             if (!session.isClosing()) {
39                 // Call the closeNow method only if needed
40                 session.closeNow();
41             }
42 
43             future.setException(cause);
44         }
45     }

處理邏輯基本上一樣,都是先獲取過濾器鏈的頭節點,開始處理對應的事件,然後再依次呼叫下一個節點的過濾器的方法。另外最後一個過濾器TailFilter會將對應的事件會最終交給業務層處理,呼叫IoHandler的對應方法,讓業務層處理事件。

總結MINA服務端的工作機制:

1、建立IoService,每個IoService內部有一個執行緒池用於處理客戶端的連線請求,並且有一個IoProcessor池,預設數量為CPU個數+1,用於處理客戶端的IO操作

2、IoService內部有一個Selector用於監聽客戶端的連線請求,連線成功之後會建立IoSession並交給IoProcessor池處理

3、IoProcessor池根據IoSession的ID進行取模演算法選取一個IoProcessor和IoSession進行繫結,一個IoProcessor可以繫結多個IoSession,一個IoSession只可以繫結一個IoProcessor

4、IoProcessor內部也有一個Selector用來監控所有關聯的IoSession的IO操作狀態,並且有一個執行緒池用來處理IO操作

5、IoProcessor將處理的IO事件交給IoService繫結的過濾器鏈,過濾器鏈上的所有過濾器依次處理IO事件

7、過濾器鏈的最後一個過濾器為TailFilter,處理完IO事件之後將IO事件交給業務層處理器IoHandler

8、IoHandler處理業務資料,而寫資料的話順序完全相反,才業務層到過濾器鏈路,通過HeadFilter再由IoProcessor傳送出去