1. 程式人生 > >MINA原理詳解

MINA原理詳解

main

  • 通過SocketConnector同服務器端建立連接
  • 鏈接建立之後I/O的讀寫交給了I/O Processor線程,I/O Processor是多線程的
  • 通過I/O Processor讀取的數據經過IoFilterChain裏所有配置的IoFilter,IoFilter進行消息的過濾,格式的轉換,在這個層面可以制定一些自定義的協議
  • 最後IoFilter將數據交給Handler進行業務處理,完成了整個讀取的過程
  • 寫入過程也是類似,只是剛好倒過來,通過IoSession.write寫出數據,然後Handler進行寫入的業務處理,處理完成後交給IoFilterChain,進行消息過濾和協議的轉換,最後通過I/O Processor將數據寫出到socket通道
  • IoFilterChain作為消息過濾鏈

    1. 讀取的時候是從低級協議到高級協議的過程,一般來說從byte字節逐漸轉換成業務對象的過程
    2. 寫入的時候一般是從業務對象到字節byte的過程
      IoSession貫穿整個通信過程的始終

    整個過程可以用一個圖來表現
    技術分享圖片

    消息箭頭都是有NioProcessor-N線程發起調用,默認情況下也在NioProcessor-N線程中執行

    Connector : 作為連接客戶端,SocketConector用來和服務器端建立連接,連接成功,創建IoProcessor Thread(不能超過指定的processorCount),Thread由指定的線程池進行管理,IoProcessor 利用NIO框架對IO進行處理,同時創建IoSession。連接的建立是通過Nio的SocketChannel進行。

    NioSocketConnector connector = new NioSocketConnector(processorCount); 
    ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));建立一個I/O通道 

    Acceptor :作為服務器端的連接接受者,SocketAcceptor用來監聽端口,同客戶端建立連接,連接建立之後的I/O操作全部交給IoProcessor進行處理

    IoAcceptor acceptor = new NioSocketAcceptor(); 
    acceptor.bind( new InetSocketAddress(PORT) ); 

    Protocol : 利用IoFilter,對消息進行解碼和編碼,如以下代碼通過 MyProtocolEncoder 將java對象轉成byte串,通過MyProtocalDecoder 將byte串恢復成java對象

    connector.getFilterChain().addLast("codec";,  new  ProtocolCodecFilter( new  MyProtocalFactory()));  
    ......  
    public   class  MyProtocalFactory  implements  ProtocolCodecFactory {  
         ProtocolEncoderAdapter encoder = new  MyProtocolEncoder();  
         ProtocolDecoder decoder = new  MyProtocalDecoder() ;  
         public  ProtocolDecoder getDecoder(IoSession session)  throws  Exception {  
            return  decoder;  
         }  
         public  ProtocolEncoder getEncoder(IoSession session)  throws  Exception {  
            return  encoder;  
         }  
    }  
    ......  
    public   class  MyProtocalDecoder  extends  ProtocolDecoderAdapter  {  
    
     public   void  decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)  
         throws  Exception {  
            int   id  = in.getInt();  
            int   len = in.getInt();  
            byte []  dst =  new   byte [len];  
    
            in.get(dst);  
    
            String name = new  String(dst,"GBK");  
    
            Item item = new  Item();  
            item.setId(id);  
            item.setName(name);  
            out.write(item);  
     }  
    }  
    ......  
    public   class  MyProtocolEncoder  extends  ProtocolEncoderAdapter {  
    
     public   void  encode(IoSession session, Object message,  
         ProtocolEncoderOutput out) throws  Exception {  
        Item item = (Item)message;  
        int  byteLen =  8  + item.getName().getBytes("GBK").length ;  
        IoBuffer buf = IoBuffer.allocate(byteLen);  
        buf.putInt(item.getId());  
        buf.putInt(item.getName().getBytes("GBK").length);  
        buf.put(item.getName().getBytes("GBK";));  
        buf.flip();  
        out.write(buf);  
    
     }  
    }  

    handler : 具體處理事件,事件包括:sessionCreated、sessionOpened、sessionClosed、sessionIdle、exceptionCaught、messageReceived、messageSent。
    connector.setHandler(new MyHandler());MyHandler繼承IoHandlerAdapter類或者實現IoHandler接口.事件最終由IoProcessor線程發動調用。

    Processor : I/O處理器、允許多線程讀寫,開發過程中只需要指定線程數量,Processor通過Nio框架進行I/O的續寫操作,Processor包含了Nio的Selector的引用。這點也正是mina的優勢,如果直接用Nio編寫,則需要自己編寫代碼來實現類似Processor的功能。正因為 I/O Processor是異步處理讀寫的,所以我們有時候需要識別同一個任務的消息,比如一個任務包括發送消息,接收消息,反饋消息,那麽我們需要在制定消息格式的時候,消息頭裏能包含一個能識別是同一個任務的id。
    I/O Porcessor線程數的設置 :如果是SocketConnector,則可以在構造方法中指定,如:new SocketConnector(processorCount, Executors.newCachedThreadPool());如果是SocketAcceptor,也是一樣的:SocketAcceptor acceptor = new SocketAcceptor(ProcessorCount, Executors.newCachedThreadPool());
    processorCount為最大Porcessor線程數,這個值可以通過性能測試進行調優,默認值是cpu核數量+1(Runtime.getRuntime().availableProcessors() + 1)。
    比較奇怪的是,每個IoProcessor在創建的時候會本地自己和自己建立一個連接?

    IoSession : IoSession是用來保持IoService的上下文,一個IoService在建立Connect之後建立一個IoSession(一個連接一個session),IoSession的生命周期從Connection建立到斷開為止
    IoSession做兩件事情:
    1.通過IoSession可以獲取IoService的所有相關配置對象(持有對IoService,Processor池,SocketChannel,SessionConfig和IoService.IoHandler的引用)
    2.通過IoSession.write 是數據寫出的入口

    關於線程
    ThreadModel 1.x版本的mina還有線程模式選項在2.x之後就沒有了
    1.x版本指定線程模式
    SocketConnectorConfig cfg = new SocketConnectorConfig();
    cfg.setThreadModel(ThreadModel.MANUAL);

    MINA有3種worker線程
    Acceptor、Connector、I/O processor 線程
    Acceptor Thread: 一般作為服務器端鏈接的接收線程,實現了接口IoService,線程的數量就是創建SocketAcceptor 的數量
    Connector Thread :一般作為客戶端的請求建立鏈接線程,實現了接口IoService,維持了一個和服務器端Acceptor的一個鏈接,線程數量就是創建SocketConnector 的數量
    Mina的SocketAcceptor和SocketConnector均是繼承了BaseIoService,是對IoService的兩種不同的實現
    I/O processor Thread :作為I/O真正處理的線程,存在於服務器端和客戶端,用來處理I/O的讀寫操作,線程的數量是可以配置的,默認最大數量是CPU個數+1

    服務器端:在創建SocketAcceptor的時候指定ProcessorCount

    SocketAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 

    客戶端:在創建SocketConnector 的時候指定ProcessorCount

    SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());

    I/O Processor Thread,是依附於IoService,類似上面的例子SocketConnector connector = new SocketConnector(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool());是指SocketConnector這個線程允許CPU+1個I/O Processor Thread
    NioProcessor雖然是多線程,但是對與一個連接的時候業務處理只會使用一個線程進行處理(Processor線程對於一個客戶端連接只使用一個線程NioProcessor-n)如果handler的業務比較耗時,會導致NioProcessor線程堵塞 ,在2個客戶端同時連接上來的時候會創建第2個(前提是第1個NioProcessor正在忙),創建的最大數量由Acceptor構造方法的時候指定。如果:一個客戶端連接同服務器端有很多通信,並且I/O的開銷不大,但是Handler處理的業務時間比較長,那麽需要采用獨立的線程模式,在 FilterChain的最後增加一個ExecutorFitler :

    acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool())); 

    這樣可以保證processor和handler的線程是分開的,否則:客戶端發送3個消息,而服務器對於每個消息要處理10s左右,那麽這3個消息是被串行處理,在處理第一個消息的時候,後面的消息將被堵塞,同樣反過來客戶端也有同樣的問題。

    客戶端Porcessor堵塞測試情況:

    1. 以下代碼在建立連接後連續發送了5個消息(item)
      ConnectFuture future = connector.connect( new InetSocketAddress(HOSTNAME, PORT));
      future.awaitUninterruptibly();
      session = future.getSession();
      Item item = new Item();
      item.setId(12345 );
      item.setName("hi");
      session.write(item);
      session.write(item);
      session.write(item);
      session.write(item);
      session.write(item);

    2. 在handle的messageSent方法進行了延時處理,延時3秒

      public   void  messageSent(IoSession session, Object message)  throws  Exception {  
        Thread.sleep(3000 );  
        System.out.println(message);  

      }

    3. 測試結果
      5個消息是串行發送,都由同一個IoPorcessor線程處理

      session.write(item);  
      session.write(item);  
      session.write(item);  
      session.write(item);  
      session.write(item);  

      服務器端每隔3秒收到一個消息。因為調用是由IoProcessor觸發,而一個connector只會使用一個IoProcessor線程

    4. 增加ExecutorFilter,ExecutorFilter保證在處理handler的時候是獨立線程
      connector.getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()));

    5. 測試結果
      4個session.wirte變成了並行處理,服務器端同時收到了5條消息

    關註公眾號獲取springcloud dubbo 視頻
    技術分享圖片

    MINA原理詳解