NIO之路--MINA框架原始碼解析
MINA框架是基於NIO的非同步IO框架,上一文已經對MINA的理論及實踐做了分析,本文將對於MINA的整體原始碼實現進行分析。
通過MINA的實際案例可以發現,MINA的IO實現相比於NIO的使用要簡單很多,因為不需要關心IO的具體實現,只需要關心具體的IO資料即可。MINA服務端整體步驟一共就四步:
1、建立IoService:初始化IoService,服務端就是建立IoAcceptor物件,客戶端就是建立IoConnector物件
2、新增IoFilter:新增IO過濾器,每個IoService內部都有一個IO過濾器鏈IoFIlterChain,呼叫addBefore或addLast等方法將IO過濾器新增到過濾器鏈上
3、設定IoHandler:給IoService設定IO資料處理器IoHandler物件,用來處理器具體的IO業務資料
4、繫結監聽埠號:呼叫IoService的bind方法監聽服務端需要監聽的埠號,並開啟Selector監聽客戶端連線
一、初始化IoService
伺服器建立IoService是直接建立IoService的子介面IoAcceptor,實現類為NioSocketAcceptor,例項化程式碼如下:
1 /** 建立預設例項*/ 2 IoAcceptor acceptor1 = new NioSocketAcceptor(); 3 4 /** 建立指定數量IoProcessor的例項*/ 5 IoAcceptor acceptor2 = new NioSocketAcceptor(2); 6 7 /** 建立指定執行緒池的例項*/ 8 Executor executor = Executors.newFixedThreadPool(4); 9 IoAcceptor acceptor3 = new NioSocketAcceptor(executor, new SimpleIoProcessorPool<>(NioProcessor.class));
雖然NioSocketAcceptor是實現了IoAcceptor介面,但是並不是直接實現的,而是繼承了抽象的實現了IoAcceptor介面的父類AbstractPollingIoAcceptor,按預設的構造方法為例,初始化程式碼如下:
1 /** 無參建構函式*/ 2 public NioSocketAcceptor() { 3 /** 呼叫父類建構函式*/ 4 super(new DefaultSocketSessionConfig(), NioProcessor.class); 5 ((DefaultSocketSessionConfig) getSessionConfig()).init(this); 6 } 7 8 /** AbstractPollingIoAcceptor建構函式 9 * @param sessionConfig:IoSession的全域性配置 10 * @param processorClass:IoProcessor的Class 11 * */ 12 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<S>> processorClass) { 13 /** 呼叫過載建構函式*/ 14 this(sessionConfig, null, new SimpleIoProcessorPool<S>(processorClass), true, null); 15 } 16 17 /** AbstractPollingIoAcceptor建構函式 18 * @param sessionConfig:IoSession的全域性配置 19 * @param executor:執行緒池 20 * @param processor:IoProcessor物件 21 * @param createdProcessor : 是否建立Processor 22 * @param selectorProvider : SelectorProvider物件,用於建立Selector 23 * */ 24 private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<S> processor, 25 boolean createdProcessor, SelectorProvider selectorProvider) { 26 /** 1.呼叫父親AbstractIoAcceptor的建構函式*/ 27 super(sessionConfig, executor); 28 29 if (processor == null) { 30 throw new IllegalArgumentException("processor"); 31 } 32 /** 2.設定屬性processor、createdProcessor值*/ 33 this.processor = processor; 34 this.createdProcessor = createdProcessor; 35 36 try { 37 /** 3.根據SelectorProvider物件初始化Selector,和NIO的根據SelectorProvider獲取Selector物件邏輯一樣 */ 38 init(selectorProvider); 39 40 //標記當前IoAcceptor的Selector已經建立完成,可以接收客戶端的連線請求了 41 selectable = true; 42 } catch (RuntimeException e) { 43 throw e; 44 } catch (Exception e) { 45 throw new RuntimeIoException("Failed to initialize.", e); 46 } finally { 47 if (!selectable) { 48 try { 49 destroy(); 50 } catch (Exception e) { 51 ExceptionMonitor.getInstance().exceptionCaught(e); 52 } 53 } 54 } 55 } 56 57 /** AbstractIoAcceptor建構函式*/ 58 protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) { 59 /** 呼叫父類AbstractIoService建構函式 */ 60 super(sessionConfig, executor); 61 defaultLocalAddresses.add(null); 62 } 63 64 /** AbstractIoService建構函式 */ 65 protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) { 66 /** 引數校驗*/ 67 if (sessionConfig == null) { 68 throw new IllegalArgumentException("sessionConfig"); 69 } 70 71 if (getTransportMetadata() == null) { 72 throw new IllegalArgumentException("TransportMetadata"); 73 } 74 75 if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(sessionConfig.getClass())) { 76 throw new IllegalArgumentException("sessionConfig type: " + sessionConfig.getClass() + " (expected: " 77 + getTransportMetadata().getSessionConfigType() + ")"); 78 } 79 80 /** 建立IoServiceListenerSupport物件,IoServiceListenerSupport內部有一個IoServiceListener的列表 81 * 將當前的IoServerListener物件新增到IoServiceListenerSupport的列表中 82 * */ 83 listeners = new IoServiceListenerSupport(this); 84 listeners.add(serviceActivationListener); 85 86 /** 設定屬性 sessionConfig*/ 87 this.sessionConfig = sessionConfig; 88 89 //載入異常監聽器ExceptionMonitor物件,提前載入防止在使用的時候還沒有初始化 90 ExceptionMonitor.getInstance(); 91 /** 設定執行緒池,如果沒有自定義,就採用預設的newCachedThreadPool*/ 92 if (executor == null) { 93 this.executor = Executors.newCachedThreadPool(); 94 createdExecutor = true; 95 } else { 96 this.executor = executor; 97 createdExecutor = false; 98 } 99 100 threadName = getClass().getSimpleName() + '-' + id.incrementAndGet(); 101 }
整個的初始化過程都是在給IoAcceptor的一些屬性進行初始化,核心屬性包括sessionConfig、executor、IoProcessor等
其中SessionConfig表示IoSession的全域性屬性配置,主要配置如下:
1 public interface IoSessionConfig { 2 3 /** 4 * 獲取IoProcessor讀取資料的緩衝區大小 5 */ 6 int getReadBufferSize(); 7 8 /** 9 * 設定IoProcessor讀取資料的緩衝區大小,通常會由IoProcessor自動動態調整 10 */ 11 void setReadBufferSize(int readBufferSize); 12 13 /** 14 * 獲取IoProcessor讀取屬性的緩衝區大小的最小值 15 */ 16 int getMinReadBufferSize(); 17 18 /** 19 * 設定IoProcessor讀取屬性的緩衝區大小的最小值 20 */ 21 void setMinReadBufferSize(int minReadBufferSize); 22 23 /** 24 * 獲取IoProcessor讀取屬性的緩衝區大小的最大值 25 */ 26 int getMaxReadBufferSize(); 27 28 /** 29 * 設定IoProcessor讀取屬性的緩衝區大小的最大值 30 */ 31 void setMaxReadBufferSize(int maxReadBufferSize); 32 33 /** 34 * 獲取吞吐量(TPS)的統計間隔,單位為秒,預設是3秒 35 */ 36 int getThroughputCalculationInterval(); 37 38 /** 39 * 獲取吞吐量(TPS)的統計間隔,單位為毫秒 40 */ 41 long getThroughputCalculationIntervalInMillis(); 42 43 /** 44 * 設定吞吐量的統計間隔時間 45 */ 46 void setThroughputCalculationInterval(int throughputCalculationInterval); 47 48 /** 49 * 獲取指定狀態的空閒時間,不同狀態時間可能設定的不一樣, 型別如下: 50 * READER_IDLE : 讀資料空閒 51 * WRITER_IDLE : 寫資料空閒 52 * BOTH_IDLE : 讀寫都空閒 53 */ 54 int getIdleTime(IdleStatus status); 55 56 /** 57 * 獲取指定狀態的空閒時間,單位為毫秒 58 */ 59 long getIdleTimeInMillis(IdleStatus status); 60 61 /** 62 * 設定指定狀態的空閒時間 63 */ 64 void setIdleTime(IdleStatus status, int idleTime); 65 66 /** 67 * 獲取讀空閒時間,單位秒 68 */ 69 int getReaderIdleTime(); 70 71 /** 72 * 獲取讀空閒時間,單位毫秒 73 */ 74 long getReaderIdleTimeInMillis(); 75 76 /** 77 * 設定讀空閒時間 78 */ 79 void setReaderIdleTime(int idleTime); 80 81 /** 82 * 獲取寫空閒時間,單位秒 83 */ 84 int getWriterIdleTime(); 85 86 /** 87 * 獲取寫空閒時間,單位毫秒 88 */ 89 long getWriterIdleTimeInMillis(); 90 91 /** 92 * 設定寫空閒時間 93 */ 94 void setWriterIdleTime(int idleTime); 95 96 /** 97 * 獲取讀寫都空閒時間,單位秒 98 */ 99 int getBothIdleTime(); 100 101 /** 102 * 獲取讀寫都空閒時間,單位毫秒 103 */ 104 long getBothIdleTimeInMillis(); 105 106 /** 107 * 設定讀寫都空閒時間 108 */ 109 void setBothIdleTime(int idleTime); 110 111 /** 112 * 獲取寫超時時間,單位為秒 113 */ 114 int getWriteTimeout(); 115 116 /** 117 * 獲取寫超時時間,單位毫秒 118 */ 119 long getWriteTimeoutInMillis(); 120 121 /** 122 * 設定寫超時時間 123 */ 124 void setWriteTimeout(int writeTimeout); 125 126 /** 127 * 獲取會話讀操作是否開啟 128 */ 129 boolean isUseReadOperation(); 130 131 /** 132 * 開啟或關閉會話讀操作,如果開啟所有接收到的訊息會儲存在記憶體的BlockingQueue中,使客戶端應用可以更方便讀取接收的訊息 133 * 開啟這個選項對伺服器應用無效,並可能會導致記憶體洩漏,預設為關閉狀態 134 */ 135 void setUseReadOperation(boolean useReadOperation); 136 137 /** 全量設定為IoSessionConfig */ 138 void setAll(IoSessionConfig config); 139 }
另外一個屬性是執行緒池物件executor,如果沒有自定義執行緒池傳入的話,那麼預設會呼叫Executors.newCachedThreadPool()建立無執行緒數上限的執行緒池,所以推薦使用自定義的執行緒池,避免預設的執行緒池沒有執行緒數量限制導致執行緒過多的問題。
還有一個屬性是IoProcessor池,預設類為SimpleIoProcessorPool,SimpleIoProcessorPool內部有一個IoProcessor[] pool和一個Executor executor屬性,pool是IoProcessor陣列,儲存所有的IoProcessor,數量可以自定義通過建構函式傳入,預設的個數為當前伺服器機器的CPU個數+1,比如4核CPU那麼就會建立5個IoProcessor,另外每一個IoProcessor初始化的時候都會設定執行緒池executor屬性,如果executor沒有自定義同樣也會使用Executors.newCachedThreadPool建立。
總結:
IoService初始化的時候涉及到了兩個執行緒池,首先是IoService本身使用的執行緒池,IoService本身用於接收客戶端的連線請求,而連線請求的處理就交給了執行緒池處理;
另外IoService初始化時會建立多個IoProcessor用於處理客戶端具體的IO操作,每一個IoProcessor內部有一個Selector用於監聽客戶端IO事件,然後將IO事件交給內部的執行緒池來處理
二、新增IoFilter
IoService內部都一個過濾器鏈IoFilterChainBuilder物件,預設實現類為DefaultIoFilterChainBuilder類,新增IoFilter時主要有四個方法,分別是在過濾器鏈的不同位置插入指定過濾器,用法分別如下:
1 IoAcceptor acceptor = new NioSocketAcceptor(); 2 /** 獲取過濾器鏈 */ 3 DefaultIoFilterChainBuilder ioFilterChain = acceptor.getFilterChain(); 4 5 /** 1.在過濾器鏈的首部加入過濾器 */ 6 ioFilterChain.addFirst("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8")))); 7 /** 2.在過濾器鏈的指定過濾器前面加入過濾器 */ 8 ioFilterChain.addBefore("logFilter","codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8")))); 9 /** 3.在過濾器鏈的指定過濾器後面加入過濾器 */ 10 ioFilterChain.addAfter("logFilter","codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8")))); 11 /** 4.在過濾器鏈的尾部加入過濾器 */ 12 ioFilterChain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
以addFirst為例,實現邏輯如下:
1 /** 在過濾器鏈首部插入過濾器 2 * @param name:過濾器的名稱 3 * @param filter: 過濾器 4 * */ 5 public synchronized void addFirst(String name, IoFilter filter) { 6 /** 構造過濾器鏈路節點物件EntryImpl,並呼叫register方法加入到列表中 */ 7 register(0, new EntryImpl(name, filter)); 8 } 9 10 11 private final List<Entry> entries; 12 13 public DefaultIoFilterChainBuilder() { 14 entries = new CopyOnWriteArrayList<Entry>(); 15 } 16 17 /** 18 * 在指定位置插入Entry節點 19 * */ 20 private void register(int index, Entry e) { 21 if (contains(e.getName())) { 22 throw new IllegalArgumentException("Other filter is using the same name: " + e.getName()); 23 } 24 //呼叫List的add方法在指定為止插入節點 25 entries.add(index, e); 26 }
從原始碼可以看出新增IoFilter的邏輯比較簡單,過濾器鏈IoFilterChainBuilder物件內部有一個列表,用於存放所有的IoFilter,新增過濾器就是將IoFilter和名稱封裝成Entry物件新增到列表中,不同的add方法實際就是呼叫List的add(int index, Entry e)方法在指定的位置插入元素。addFirst就是在list頭部插入元素,addLast就是在list尾部插入元素,addBefore和addAfter先通過遍歷查詢指定過濾器在List中的位置,然後再將新插入的過濾器插入到指定位置
三、設定IoHandler
1 public final void setHandler(IoHandler handler) { 2 if (handler == null) { 3 throw new IllegalArgumentException("handler cannot be null"); 4 } 5 /** 判斷IoService是否活躍,主要是判斷IoServiceListenerSupport是否有活躍的IoServiceListener */ 6 if (isActive()) { 7 throw new IllegalStateException("handler cannot be set while the service is active."); 8 } 9 /** 設定IoHandler屬性*/ 10 this.handler = handler; 11 }
設定IoHandler的邏輯比較簡單,就是給IoService內部的handler屬性賦值
四、繫結主機
前面三個步驟都是在初始化伺服器並設定各種屬性,接下來就是繫結監聽的邏輯,原始碼如下:
1 public final void bind(SocketAddress localAddress) throws IOException { 2 if (localAddress == null) { 3 throw new IllegalArgumentException("localAddress"); 4 } 5 /** 建立地址列表,將傳入的SocketAddress新增到列表中*/ 6 List<SocketAddress> localAddresses = new ArrayList<>(1); 7 localAddresses.add(localAddress); 8 /** 內部方法執行繫結邏輯*/ 9 bind(localAddresses); 10 }
呼叫內部過載的bind方法,原始碼如下:
1 public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException { 2 /** 引數校驗*/ 3 if (isDisposing()) { 4 throw new IllegalStateException("The Accpetor disposed is being disposed."); 5 } 6 7 if (localAddresses == null) { 8 throw new IllegalArgumentException("localAddresses"); 9 } 10 11 List<SocketAddress> localAddressesCopy = new ArrayList<>(); 12 13 for (SocketAddress a : localAddresses) { 14 checkAddressType(a); 15 localAddressesCopy.add(a); 16 } 17 18 if (localAddressesCopy.isEmpty()) { 19 throw new IllegalArgumentException("localAddresses is empty."); 20 } 21 22 boolean activate = false; 23 synchronized (bindLock) { 24 synchronized (boundAddresses) { 25 if (boundAddresses.isEmpty()) { 26 activate = true; 27 } 28 } 29 /** 判斷IoHandler是否為空*/ 30 if (getHandler() == null) { 31 throw new IllegalStateException("handler is not set."); 32 } 33 34 try { 35 /** 繫結主機地址 */ 36 Set<SocketAddress> addresses = bindInternal(localAddressesCopy); 37 synchronized (boundAddresses) { 38 boundAddresses.addAll(addresses); 39 } 40 } catch (IOException e) { 41 throw e; 42 } catch (RuntimeException e) { 43 throw e; 44 } catch (Exception e) { 45 throw new RuntimeIoException("Failed to bind to: " + getLocalAddresses(), e); 46 } 47 } 48 /** 啟用IoServiceListener */ 49 if (activate) { 50 getListeners().fireServiceActivated(); 51 } 52 }
雖然程式碼比較多,但是核心程式碼就只有 bindInternal這一行, bindInternal是真正的繫結主機的方法,程式碼如下:
1 /** 繫結主機*/ 2 protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception { 3 /** 建立一個Future物件,當IoAcceptor的Selector處理了該請求後會給Future物件傳送一個訊號 */ 4 AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses); 5 6 /** 新增到AcceptorOperationFuture佇列中*/ 7 registerQueue.add(request); 8 9 /** 開啟Acceptor*/ 10 startupAcceptor(); 11 12 try { 13 /** 訊號量設定為1,相當於加鎖處理*/ 14 lock.acquire(); 15 16 wakeup(); 17 } finally { 18 /** 訊號量釋放, 相當於解鎖處理*/ 19 lock.release(); 20 } 21 22 // Now, we wait until this request is completed. 23 request.awaitUninterruptibly(); 24 25 if (request.getException() != null) { 26 throw request.getException(); 27 } 28 Set<SocketAddress> newLocalAddresses = new HashSet<>(); 29 30 for (H handle : boundHandles.values()) { 31 newLocalAddresses.add(localAddress(handle)); 32 } 33 34 return newLocalAddresses; 35 }
核心邏輯是呼叫了startupAcceptor方法,程式碼如下:
1 private void startupAcceptor() throws InterruptedException { 2 if (!selectable) { 3 registerQueue.clear(); 4 cancelQueue.clear(); 5 } 6 7 /** 建立Acceptor物件, Acceptor實現了Runnable介面*/ 8 Acceptor acceptor = acceptorRef.get(); 9 10 if (acceptor == null) { 11 lock.acquire(); 12 acceptor = new Acceptor(); 13 14 if (acceptorRef.compareAndSet(null, acceptor)) { 15 /** 將實現了Runnable介面的acceptor物件放入執行緒池中執行*/ 16 executeWorker(acceptor); 17 } else { 18 lock.release(); 19 } 20 } 21 }
1 protected final void executeWorker(Runnable worker) { 2 executeWorker(worker, null); 3 } 4 5 protected final void executeWorker(Runnable worker, String suffix) { 6 String actualThreadName = threadName; 7 if (suffix != null) { 8 actualThreadName = actualThreadName + '-' + suffix; 9 } 10 executor.execute(new NamePreservingRunnable(worker, actualThreadName)); 11 }
該方法的核心邏輯是建立一個Acceptor物件,Acceptor物件是實現了Runnable介面的,所以是一個可執行邏輯。然後呼叫executeWorker方法將Acceptor物件交給執行緒池執行,而執行緒池就是初始化IoService時初始化的執行緒池,預設是Executors.newCachedThreadPool
既然Acceptor實現了Runnable介面,那麼就再看下Acceptor的具體實現邏輯:
1 private class Acceptor implements Runnable { 2 public void run() { 3 assert (acceptorRef.get() == this); 4 5 int nHandles = 0; 6 7 /** 釋放鎖*/ 8 lock.release(); 9 10 while (selectable) { 11 try { 12 /** 獲取註冊的監聽埠的數量*/ 13 nHandles += registerHandles(); 14 15 /** 呼叫Selector的select()方法,會阻塞當前執行緒 */ 16 int selected = select(); 17 18 // Now, if the number of registred handles is 0, we can 19 // quit the loop: we don't have any socket listening 20 // for incoming connection. 21 if (nHandles == 0) { 22 acceptorRef.set(null); 23 24 if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { 25 assert (acceptorRef.get() != this); 26 break; 27 } 28 29 if (!acceptorRef.compareAndSet(null, this)) { 30 assert (acceptorRef.get() != this); 31 break; 32 } 33 34 assert (acceptorRef.get() == this); 35 } 36 37 if (selected > 0) { 38 /** 處理Selector返回的所有的SelectionKey */ 39 processHandles(selectedHandles()); 40 } 41 42 // check to see if any cancellation request has been made. 43 nHandles -= unregisterHandles(); 44 } catch (ClosedSelectorException cse) { 45 // If the selector has been closed, we can exit the loop 46 ExceptionMonitor.getInstance().exceptionCaught(cse); 47 break; 48 } catch (Exception e) { 49 ExceptionMonitor.getInstance().exceptionCaught(e); 50 51 try { 52 Thread.sleep(1000); 53 } catch (InterruptedException e1) { 54 ExceptionMonitor.getInstance().exceptionCaught(e1); 55 } 56 } 57 } 58 59 /** 當銷燬IoService時, 銷燬所有的processor*/ 60 if (selectable && isDisposing()) { 61 selectable = false; 62 try { 63 if (createdProcessor) { 64 processor.dispose(); 65 } 66 } finally { 67 try { 68 synchronized (disposalLock) { 69 if (isDisposing()) { 70 destroy(); 71 } 72 } 73 } catch (Exception e) { 74 ExceptionMonitor.getInstance().exceptionCaught(e); 75 } finally { 76 disposalFuture.setDone(); 77 } 78 } 79 } 80 } 81 82 /** 83 * This method will process new sessions for the Worker class. All 84 * keys that have had their status updates as per the Selector.selectedKeys() 85 * method will be processed here. Only keys that are ready to accept 86 * connections are handled here. 87 * <p/> 88 * Session objects are created by making new instances of SocketSessionImpl 89 * and passing the session object to the SocketIoProcessor class. 90 */ 91 @SuppressWarnings("unchecked") 92 private void processHandles(Iterator<H> handles) throws Exception { 93 /** 遍歷所有SelectionKey*/ 94 while (handles.hasNext()) { 95 H handle = handles.next(); 96 handles.remove(); 97 98 /** 處理客戶端連線請求,封裝成NioSocketSession物件*/ 99 S session = accept(processor, handle); 100 101 if (session == null) { 102 continue; 103 } 104 /** 初始化客戶端Session物件, 設定上一次讀寫時間 */ 105 initSession(session, null, null); 106 107 /** 將客戶端Session新增到IoProcessor執行緒池,並分配一個IoProcessor和Session進行繫結 */ 108 session.getProcessor().add(session); 109 } 110 } 111 } 112 113 protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception { 114 115 SelectionKey key = null; 116 117 if (handle != null) { 118 key = handle.keyFor(selector); 119 } 120 /** key有效且必須觸發了OP_ACCEPT事件,其他事件不處理*/ 121 if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) { 122 return null; 123 } 124 125 /** 獲取客戶端連線的Channel*/ 126 SocketChannel ch = handle.accept(); 127 128 if (ch == null) { 129 return null; 130 } 131 /** 封裝客戶端連線為NioSocketSession物件 */ 132 return new NioSocketSession(this, processor, ch); 133 } 134 135 public NioSocketSession(IoService service, IoProcessor<NioSession> processor, SocketChannel channel) { 136 /** 呼叫父類建構函式 設定IoProcessor、IoService、SocketChannel屬性*/ 137 super(processor, service, channel); 138 config = new SessionConfigImpl(); 139 /** 將IoService的全域性SessionConfig複製給當前session的配置*/ 140 config.setAll(service.getSessionConfig()); 141 }
從程式碼中可以看出IoAcceptor的run方法基本上和NIO的伺服器啟動方法差不多,首先了呼叫Selector的select()方法獲取所有客戶端的請求連線accept事件,然後遍歷所有的SelectionKey,將客戶端的連線封裝成NioSocketSession,最後再將NioSocketSession新增到IoProcessor執行緒池中,而繫結到具體的IoProcessor的邏輯是在IoProcessor的add方法中實現的,這裡IoProcessor執行緒池是通過SimpleIoProcessorPool類實現的,原始碼如下:
1 /** 新增SocketSession*/ 2 public final void add(S session) { 3 /** 根據session獲取具體的IoProcessor,並將session新增到IoProcessor中*/ 4 getProcessor(session).add(session); 5 } 6 7 private IoProcessor<S> getProcessor(S session) { 8 IoProcessor<S> processor = (IoProcessor<S>) session.getAttribute(PROCESSOR); 9 10 if (processor == null) { 11 if (disposed || disposing) { 12 throw new IllegalStateException("A disposed processor cannot be accessed."); 13 } 14 /** 根據session的ID通過取模演算法從陣列中獲取對應的IoProcessor*/ 15 processor = pool[Math.abs((int) session.getId()) % pool.length]; 16 17 if (processor == null) { 18 throw new IllegalStateException("A disposed processor cannot be accessed."); 19 } 20 /** 給session設定IoProcessor屬性*/ 21 session.setAttributeIfAbsent(PROCESSOR, processor); 22 } 23 24 return processor; 25 }
每一個SocketSession會繫結一個IoProcessor,並會快取在session的attribute中,而獲取IoProcessor的方式則是通過取模演算法從IoProcessor陣列中獲取。
然後呼叫IoProcessor的add方法將session新增到IoProcessor中,處理IO資料的IoProcessor實現類為NioProcessor,實現的add方法原始碼如下:
1 /** 新建立的session佇列*/ 2 private final Queue<S> newSessions = new ConcurrentLinkedQueue<>(); 3 4 /** 需要移除的session佇列 */ 5 private final Queue<S> removingSessions = new ConcurrentLinkedQueue<>(); 6 7 /** 需要重新整理的session佇列*/ 8 private final Queue<S> flushingSessions = new ConcurrentLinkedQueue<>(); 9 10 public final void add(S session) { 11 if (disposed || disposing) { 12 throw new IllegalStateException("Already disposed."); 13 } 14 15 /** 將新的session加入到佇列中*/ 16 newSessions.add(session); 17 /** 開啟IoProcessor*/ 18 startupProcessor(); 19 } 20 21 private void startupProcessor() { 22 /** 原子引用型別,如果不存在就新建Processor*/ 23 Processor processor = processorRef.get(); 24 25 if (processor == null) { 26 processor = new Processor(); 27 28 if (processorRef.compareAndSet(null, processor)) { 29 /** 將可執行的Processor新增到執行緒池中執行 */ 30 executor.execute(new NamePreservingRunnable(processor, threadName)); 31 } 32 } 33 34 /** 喚醒Selector, 防止一直被阻塞*/ 35 wakeup(); 36 }
這裡將session新增到佇列中,然後開啟IoProcessor執行緒,過程和IoSession型別,都是封裝成一個NamePreservingRunnable物件交給執行緒池去執行,而這裡的執行緒池和IoService的執行緒池是獨立的,沒給IoProcessor都有一個自己的執行緒池
Processor具體的執行邏輯如下:
1 /** Processor執行緒,用於處理IO讀寫操作*/ 2 private class Processor implements Runnable { 3 public void run() { 4 assert (processorRef.get() == this); 5 //會話數量 6 int nSessions = 0; 7 lastIdleCheckTime = System.currentTimeMillis(); 8 /** 重試次數*/ 9 int nbTries = 10; 10 11 for (;;) { 12 try { 13 long t0 = System.currentTimeMillis(); 14 int selected = select(SELECT_TIMEOUT); 15 long t1 = System.currentTimeMillis(); 16 /** 計算select()方法執行的時間*/ 17 long delta = t1 - t0; 18 19 if (!wakeupCalled.getAndSet(false) && (selected == 0) && (delta < 100)) { 20 // Last chance : the select() may have been 21 // interrupted because we have had an closed channel. 22 if (isBrokenConnection()) { 23 LOG.warn("Broken connection"); 24 } else { 25 // Ok, we are hit by the nasty epoll 26 // spinning. 27 // Basically, there is a race condition 28 // which causes a closing file descriptor not to be 29 // considered as available as a selected channel, 30 // but 31 // it stopped the select. The next time we will 32 // call select(), it will exit immediately for the 33 // same 34 // reason, and do so forever, consuming 100% 35 // CPU. 36 // We have to destroy the selector, and 37 // register all the socket on a new one. 38 if (nbTries == 0) { 39 LOG.warn("Create a new selector. Selected is 0, delta = " + delta); 40 registerNewSelector(); 41 nbTries = 10; 42 } else { 43 nbTries--; 44 } 45 } 46 } else { 47 nbTries = 10; 48 } 49 50 /** 處理新新增的Session 51 * 將session對應的channel註冊到當前IoProcessor的Selector上,並監聽OP_READ事件 52 * */ 53 nSessions += handleNewSessions(); 54 55 updateTrafficMask(); 56 57 // Now, if we have had some incoming or outgoing events, 58 // deal with them 59 if (selected > 0) { 60 /** 當select()返回值大於0表示當前已經有IO事件需要處理,則執行process方法進行處理*/ 61 process(); 62 } 63 64 // Write the pending requests 65 long currentTime = System.currentTimeMillis(); 66 flush(currentTime); 67 68 /** 69 * 移除所有需要移除的session,移除之前會將所有需要傳送的資料傳送給客戶端 70 * */ 71 nSessions -= removeSessions(); 72 73 /** 更新所有session的讀空閒、寫空閒和讀寫空閒的時間值 */ 74 notifyIdleSessions(currentTime); 75 76 // Get a chance to exit the infinite loop if there are no 77 // more sessions on this Processor 78 if (nSessions == 0) { 79 processorRef.set(null); 80 81 if (newSessions.isEmpty() && isSelectorEmpty()) { 82 // newSessions.add() precedes startupProcessor 83 assert (processorRef.get() != this); 84 break; 85 } 86 87 assert (processorRef.get() != this); 88 89 if (!processorRef.compareAndSet(null, this)) { 90 // startupProcessor won race, so must exit processor 91 assert (processorRef.get() != this); 92 break; 93 } 94 95 assert (processorRef.get() == this); 96 } 97 98 /** 當IoProcessor銷燬了,則移除所有的客戶端的SocketSession*/ 99 if (isDisposing()) { 100 boolean hasKeys = false; 101 102 for (Iterator<S> i = allSessions(); i.hasNext();) { 103 IoSession session = i.next(); 104 105 if (session.isActive()) { 106 scheduleRemove((S)session); 107 hasKeys = true; 108 } 109 } 110 111 if (hasKeys) { 112 wakeup(); 113 } 114 } 115 } catch (ClosedSelectorException cse) { 116 // If the selector has been closed, we can exit the loop 117 // But first, dump a stack trace 118 ExceptionMonitor.getInstance().exceptionCaught(cse); 119 break; 120 } catch (Exception e) { 121 ExceptionMonitor.getInstance().exceptionCaught(e); 122 123 try { 124 Thread.sleep(1000); 125 } catch (InterruptedException e1) { 126 ExceptionMonitor.getInstance().exceptionCaught(e1); 127 } 128 } 129 } 130 131 try { 132 synchronized (disposalLock) { 133 if (disposing) { 134 doDispose(); 135 } 136 } 137 } catch (Exception e) { 138 ExceptionMonitor.getInstance().exceptionCaught(e); 139 } finally { 140 disposalFuture.setValue(true); 141 } 142 } 143 }
這裡程式碼比較多,但是核心的不多,主要功能如下:
1、處理所有新加入的Session,將session對應的channel註冊到Selector中,並監聽OP_READ事件
2、處理所有的需要移除的session,將session從Selector中移除監聽
3、呼叫Selector的select()方法獲取IO事件,如果存在IO事件,則呼叫process()方法進行處理
4、更新所有session的讀寫空閒時間
而處理IO事件的邏輯全部在process方法中處理,原始碼如下:
1 /** 處理IoProcessor的Selector監聽返回的所有IO事件 */ 2 private void process() throws Exception { 3 /** 遍歷所有的SelectionKey*/ 4 for (Iterator<S> i = selectedSessions(); i.hasNext();) { 5 S session = i.next(); 6 /** 處理單個客戶端session的IO事件*/ 7 process(session); 8 i.remove(); 9 } 10 } 11 /** 獲取所有的SelectionKey*/ 12 protected Iterator<NioSession> selectedSessions() { 13 return new IoSessionIterator(selector.selectedKeys()); 14 }
process方法的邏輯不多,先是獲取selector所有的IO事件SelectionKey集合,然後進行遍歷所有的SelectionKey集合,呼叫process(session)方法處理每個Session的IO事件,原始碼如下:
1 /***/ 2 private void process(S session) { 3 /** 處理讀事件 */ 4 if (isReadable(session) && !session.isReadSuspended()) { 5 /** 處理session的讀事件*/ 6 read(session); 7 } 8 9 /** 處理寫事件 */ 10 if (isWritable(session) && !session.isWriteSuspended() && session.setScheduledForFlush(true)) { 11 /** 重新整理session */ 12 flushingSessions.add(session); 13 } 14 } 15 16 private void read(S session) { 17 /** 獲取配置的讀緩衝區大小*/ 18 IoSessionConfig config = session.getConfig(); 19 int bufferSize = config.getReadBufferSize(); 20 /** 分配指定大小的緩衝區*/ 21 IoBuffer buf = IoBuffer.allocate(bufferSize); 22 23 final boolean hasFragmentation = session.getTransportMetadata().hasFragmentation(); 24 25 try { 26 int readBytes = 0; 27 int ret; 28 29 try { 30 if (hasFragmentation) { 31 /** 呼叫read方法將session中的IO資料讀取到緩衝區*/ 32 while ((ret = read(session, buf)) > 0) { 33 readBytes += ret; 34 35 if (!buf.hasRemaining()) { 36 break; 37 } 38 } 39 } else { 40 /** 呼叫read方法將session中的IO資料讀取到緩衝區*/ 41 ret = read(session, buf); 42 if (ret > 0) { 43 readBytes = ret; 44 } 45 } 46 } finally { 47 /** 從讀模式切換到寫模式 */ 48 buf.flip(); 49 } 50 51 if (readBytes > 0) { 52 IoFilterChain filterChain = session.getFilterChain(); 53 /** 將讀到的資料交給過濾器鏈進行過濾處理 */ 54 filterChain.fireMessageReceived(buf); 55 buf = null; 56 57 if (hasFragmentation) { 58 /** 判斷讀取到的資料是否小於緩衝區總大小的一半*/ 59 if (readBytes << 1 < config.getReadBufferSize()) { 60 /** 縮小緩衝區大小 */ 61 session.decreaseReadBufferSize(); 62 } else if (readBytes == config.getReadBufferSize()) { 63 /** 擴大緩衝區大小 */ 64 session.increaseReadBufferSize(); 65 } 66 } 67 } 68 69 if (ret < 0) { 70 /** 過濾器鏈關閉 */ 71 IoFilterChain filterChain = session.getFilterChain(); 72 filterChain.fireInputClosed(); 73 } 74 } catch (Exception e) { 75 if (e instanceof IOException) { 76 if (!(e instanceof PortUnreachableException) 77 || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass()) 78 || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) { 79 scheduleRemove(session); 80 } 81 } 82 /** 如果拋異常,則通知過濾器鏈執行異常事件*/ 83 IoFilterChain filterChain = session.getFilterChain(); 84 filterChain.fireExceptionCaught(e); 85 } 86 }
這裡邏輯比較多,但是整體邏輯不復雜,主要邏輯如下:
1、如果是可讀事件,那麼就執行read方法進行IO資料的讀取
2、根據配置的讀緩衝區大小建立IoBuffer物件進行記憶體分配
3、呼叫read方法從session中讀取資料存到IoBuffer中,讀取完成將IoBuffer從讀模式切換到寫模式
4、將讀取到的IoBuffer資料交給過濾器鏈進行所有IO過濾器進行過濾處理
5、將緩衝區大小進行擴容或降容處理
6、將連線關閉或連線異常的事件交給過濾器鏈進行對應的處理
接下來就針對每一個步驟分別進行原始碼分析
1、分配緩衝區
1 /** 記憶體分配工具 */ 2 private static IoBufferAllocator allocator = new SimpleBufferAllocator(); 3 4 /** 是否使用直接記憶體,預設為false表示使用堆內記憶體,值為true表示使用直接記憶體 */ 5 private static boolean useDirectBuffer = false; 6 7 public static IoBuffer allocate(int capacity) { 8 return allocate(capacity, useDirectBuffer); 9 } 10 11 public static IoBuffer allocate(int capacity, boolean useDirectBuffer) { 12 if (capacity < 0) { 13 throw new IllegalArgumentException("capacity: " + capacity); 14 } 15 /** 呼叫記憶體分配工具的allocate方法進行記憶體分配 */ 16 return allocator.allocate(capacity, useDirectBuffer); 17 }
分配緩衝區一共有兩個引數,分別是緩衝區的大小和是否使用直接記憶體,最終呼叫記憶體分配器的allocate方法進行記憶體分配,原始碼如下:
1 /** 建立IoBuffer物件,分配記憶體 */ 2 public IoBuffer allocate(int capacity, boolean direct) { 3 return wrap(allocateNioBuffer(capacity, direct)); 4 } 5 6 /** 建立NIO的ByteBuffer物件*/ 7 public ByteBuffer allocateNioBuffer(int capacity, boolean direct) { 8 ByteBuffer nioBuffer; 9 if (direct) { 10 /** 分配直接記憶體*/ 11 nioBuffer = ByteBuffer.allocateDirect(capacity); 12 } else { 13 /** 分配堆內記憶體*/ 14 nioBuffer = ByteBuffer.allocate(capacity); 15 } 16 return nioBuffer; 17 } 18 19 /** 將ByteBuffer物件包裝成IoBuffer物件 */ 20 public IoBuffer wrap(ByteBuffer nioBuffer) { 21 return new SimpleBuffer(nioBuffer); 22 }
這裡就回到了Java的NIO分配記憶體的邏輯了,MINA分配緩衝區的邏輯實際底層就是呼叫了Java NIO的ByteBuffer的分配邏輯,根據直接記憶體還是堆內記憶體進行記憶體分配,最終分配了之後再封裝成了IoBuffer物件。
2、IO資料讀取
1 @Override 2 protected int read(NioSession session, IoBuffer buf) throws Exception { 3 /** 從session中獲取channel*/ 4 ByteChannel channel = session.getChannel(); 5 /** 呼叫Java NIO的channel.read(ByteBuffer buffer)方法讀取資料 */ 6 return channel.read(buf.buf()); 7 }
讀取資料的邏輯也不復雜,同樣是直接呼叫了Java NIO的channel讀取Buffer資料的方式進行讀取,具體的實現邏輯可以參考Java NIO的channel讀取緩衝區資料的邏輯
3、過濾器鏈對IO資料進行過濾處理
1 public void fireMessageReceived(Object message) { 2 if (message instanceof IoBuffer) { 3 session.increaseReadBytes(((IoBuffer) message).remaining(), System.currentTimeMillis()); 4 } 5 6 callNextMessageReceived(head, session, message); 7 } 8 9 public final void increaseReadBytes(long increment, long currentTime) { 10 if (increment <= 0) { 11 return; 12 } 13 //統計已讀位元組數 14 readBytes += increment; 15 //設定上一次讀操作時間 16 lastReadTime = currentTime; 17 //設定空閒為0 18 idleCountForBoth.set(0); 19 idleCountForRead.set(0); 20 21 if (getService() instanceof AbstractIoService) { 22 ((AbstractIoService) getService()).getStatistics().increaseReadBytes(increment, currentTime); 23 } 24 } 25 26 /** 通知下一個節點處理接收訊息事件 */ 27 private void callNextMessageReceived(Entry entry, IoSession session, Object message) { 28 try { 29 IoFilter filter = entry.getFilter(); 30 NextFilter nextFilter = entry.getNextFilter(); 31 /** 依次執行每一個IoFilter的messageReceived方法 */ 32 filter.messageReceived(nextFilter, session, message); 33 } catch (Exception e) { 34 fireExceptionCaught(e); 35 } catch (Error e) { 36 fireExceptionCaught(e); 37 throw e; 38 } 39 }
過濾器鏈路處理訊息時,會從過濾器鏈路的頭節點開始依次執行messageReceived方法,直到執行到最後一個過濾器,過濾器鏈路頭節點和尾節點是固定不變的,頭節點實現為HeadFilter,尾節點實現為TailFilter。
當處理讀事件時,會從HeadFilter開始處理,然後按自定義的過濾器依次執行,最後執行TailFilter的處理;而處理寫事件時,順序完全相反,會從TailFilter開始到HeadFilter結束。所以讀資料時最後會執行TailFilter的messageReceived方法,原始碼如下:
1 @Override 2 public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { 3 AbstractIoSession s = (AbstractIoSession) session; 4 5 if (!(message instanceof IoBuffer)) { 6 s.increaseReadMessages(System.currentTimeMillis()); 7 } else if (!((IoBuffer) message).hasRemaining()) { 8 s.increaseReadMessages(System.currentTimeMillis()); 9 } 10 11 // Update the statistics 12 if (session.getService() instanceof AbstractIoService) { 13 ((AbstractIoService) session.getService()).getStatistics().updateThroughput(System.currentTimeMillis()); 14 } 15 16 try { 17 /** 尾部過濾器將訊息交給業務處理器IoHandler處理 */ 18 session.getHandler().messageReceived(s, message); 19 } finally { 20 if (s.getConfig().isUseReadOperation()) { 21 s.offerReadFuture(message); 22 } 23 } 24 }
可以看出TailFilter最重要的一個步驟是需要將傳送的IO資料交給業務層處理,通過從IoSession中獲取IoHandler物件,並呼叫IoHandler的messageReceived方法交給業務層處理IO資料
4、緩衝區擴容或降容處理
1 /** 擴容讀緩衝區大小為原先的兩倍 */ 2 public final void increaseReadBufferSize() { 3 /** 擴大兩倍*/ 4 int newReadBufferSize = getConfig().getReadBufferSize() << 1; 5 if (newReadBufferSize <= getConfig().getMaxReadBufferSize()) { 6 getConfig().setReadBufferSize(newReadBufferSize); 7 } else { 8 getConfig().setReadBufferSize(getConfig().getMaxReadBufferSize()); 9 } 10 11 deferDecreaseReadBuffer = true; 12 } 13 14 /** 15 * 降容讀緩衝區大小為原先的一半 16 */ 17 public final void decreaseReadBufferSize() { 18 if (deferDecreaseReadBuffer) { 19 deferDecreaseReadBuffer = false; 20 return; 21 } 22 23 if (getConfig().getReadBufferSize() > getConfig().getMinReadBufferSize()) { 24 /** 縮小一半*/ 25 getConfig().setReadBufferSize(getConfig().getReadBufferSize() >>> 1); 26 } 27 28 deferDecreaseReadBuffer = true; 29 }
配置的讀緩衝區大小和實際的IO資料大小可能會存在偏差,所以需要動態的調整讀緩衝區的大小,避免緩衝區記憶體不足或記憶體浪費,每次擴容都會擴容到原大小的兩倍,降容也會縮小到原先的一半
5、連線關閉或連線異常事件處理
1 public void fireInputClosed() { 2 /** 獲取過濾器鏈頭節點*/ 3 Entry head = this.head; 4 callNextInputClosed(head, session); 5 } 6 7 /** 通知下一個節點處理IO輸入關閉事件*/ 8 private void callNextInputClosed(Entry entry, IoSession session) { 9 try { 10 IoFilter filter = entry.getFilter(); 11 /** 獲取下一個過濾器*/ 12 NextFilter nextFilter = entry.getNextFilter(); 13 /** 處理IO輸入關閉事件*/ 14 filter.inputClosed(nextFilter, session); 15 } catch (Throwable e) { 16 fireExceptionCaught(e); 17 } 18 } 19 20 public void fireExceptionCaught(Throwable cause) { 21 callNextExceptionCaught(head, session, cause); 22 } 23 24 private void callNextExceptionCaught(Entry entry, IoSession session, Throwable cause) { 25 /** 喚醒關聯的Future*/ 26 ConnectFuture future = (ConnectFuture) session.removeAttribute(SESSION_CREATED_FUTURE); 27 if (future == null) { 28 try { 29 IoFilter filter = entry.getFilter(); 30 NextFilter nextFilter = entry.getNextFilter(); 31 /** 執行過濾的異常處理事件*/ 32 filter.exceptionCaught(nextFilter, session, cause); 33 } catch (Throwable e) { 34 LOGGER.warn("Unexpected exception from exceptionCaught handler.", e); 35 } 36 } else { 37 /** 關閉session*/ 38 if (!session.isClosing()) { 39 // Call the closeNow method only if needed 40 session.closeNow(); 41 } 42 43 future.setException(cause); 44 } 45 }
處理邏輯基本上一樣,都是先獲取過濾器鏈的頭節點,開始處理對應的事件,然後再依次呼叫下一個節點的過濾器的方法。另外最後一個過濾器TailFilter會將對應的事件會最終交給業務層處理,呼叫IoHandler的對應方法,讓業務層處理事件。
總結MINA服務端的工作機制:
1、建立IoService,每個IoService內部有一個執行緒池用於處理客戶端的連線請求,並且有一個IoProcessor池,預設數量為CPU個數+1,用於處理客戶端的IO操作
2、IoService內部有一個Selector用於監聽客戶端的連線請求,連線成功之後會建立IoSession並交給IoProcessor池處理
3、IoProcessor池根據IoSession的ID進行取模演算法選取一個IoProcessor和IoSession進行繫結,一個IoProcessor可以繫結多個IoSession,一個IoSession只可以繫結一個IoProcessor
4、IoProcessor內部也有一個Selector用來監控所有關聯的IoSession的IO操作狀態,並且有一個執行緒池用來處理IO操作
5、IoProcessor將處理的IO事件交給IoService繫結的過濾器鏈,過濾器鏈上的所有過濾器依次處理IO事件
7、過濾器鏈的最後一個過濾器為TailFilter,處理完IO事件之後將IO事件交給業務層處理器IoHandler
8、IoHandler處理業務資料,而寫資料的話順序完全相反,才業務層到過濾器鏈路,通過HeadFilter再由IoProcessor傳送出去