1. 程式人生 > >Mina框架研究

Mina框架研究

顯示當前的時間 guide forname cfa services com pri 開發 監聽端口

Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 組織一個較新的項目,它為開發高性能和高可用性的網絡應用程序提供了非常便利的框架。

這個框架的優點:

– 異步
– 無阻塞
– 事件驅動
– 支持TCP, UDP, APR, 串口…
– 通過 過濾器(Filters)實現擴展性
– 同時提供協議框架

總體框架

之前的一個項目用到了MINA,最近想再系統的整理一下,主要參考MINA 2.0 User Guide

基於MINA框架的應用程序架構應該是這樣的:

技術分享

底層是基於JAVA的NIO 1.0實現的;

其核心部分架構是這樣的:

技術分享

內部可以分為3 個層次:

I/O Service - 執行實際的I / O,可以選擇現成的Services如 (*Acceptor),也可以自己寫。

I/O Filter Chain - 這是一個由多個過濾器組成的過濾器鏈,在這個環節將字節數據轉換到特定的數據結構中(Filters/Transforms bytes into desired Data Structures and vice-versa)
I/O Handler - 實際的業務邏輯部分

Server端應用

對socket通信來說,使用比較廣泛的是基於Server端的應用,尤其是並發規模達到一定程度後,頗具挑戰性。那麽我們來看一下,基於MINA框架的Server端應用:

1、IOAcceptor 監聽指定的端口,處理新的網絡連接;一旦一個新的連接到達後,IOAcceptor 就產生一個session,後續所有從這個IP和端口發送過來的請求就將通過這個Session被處理。

2、Session創建後,後續所有的數據包都被人到過濾器鏈中,通過過濾器將原始的字節碼轉變成高層的對象,這個環節PacketEncoder/Decoder就十分有用。

3、最後數據包或對象被傳送給Handler做業務邏輯處理;

技術分享

Main.java:

public class Main {
private static final int PORT = 9123;
/**
* @param args the command line arguments
*/
public static void main(String[] args) throws IOException {

IoAcceptor acceptor = new NioSocketAcceptor();
acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName( "UTF-8" ))));

acceptor.setHandler( new TimeServerHandler() );

acceptor.getSessionConfig().setReadBufferSize( 2048 );
acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
acceptor.bind(new InetSocketAddress(PORT));

}

}

1、創建IoAcceptor;

2、加入日誌記錄和解碼的過濾器,其中日誌過濾器用SL4J庫記錄信息,而編碼過濾器則解碼所有收到的信息。使用 new TextLineCodecFactory() 發送的信息迕行編碼,返是MINA自帶的,功能有限,只能處理文本戒者String類型。

3、設置ServerHandler,這裏是一個自定義的Handler:TimeServerHandler;

4、設置Session的對應的I/O processor 讀緩存區大小2048;通常這個參數不需要設置;

5、設置空閑時間,這裏的BOTH_IDLEEADER_IDLEWRITER_IDLE. 都為10秒;

6、綁定監聽端口9123;

TimeServerHandler.java:

/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/

package minatest1;

/**
*
* @author THINKPAD
*/

import java.util.Date;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;

public class TimeServerHandler extends IoHandlerAdapter
{
@Override
public void exceptionCaught( IoSession session, Throwable cause ) throws Exception
{
cause.printStackTrace();
}

@Override
public void messageReceived( IoSession session, Object message ) throws Exception
{
String str = message.toString();
if( str.trim().equalsIgnoreCase("quit") ) {
session.close();
return;
}

Date date = new Date();
session.write( date.toString() );
System.out.println("Message written...");
}

@Override
public void sessionIdle( IoSession session, IdleStatus status ) throws Exception
{
System.out.println( "IDLE " + session.getIdleCount( status ));
}
}

這裏主要有一下幾個主要的方法:

messageReceived(…),對接收到的消息(已經解碼)迕行下一步處理,這裏對收到的字符串進行判斷,如果是”quit”則斷開連接;否則輸出當前時間的字符串格式;

exceptionCaught(…),自定義異常處理, 要不然異常會被“吃掉”;

sessionIdle,當Session處於IDLE狀態的時候,輸出空閑狀態次數;

測試,輸入:telnet 127.0.0.1 9123,隨便輸入一串字符串,顯示當前的時間:

技術分享

IoService

IoService是一個接口,有兩種實現:IoAcceptor和IoConnector;其中IoAcceptor是針對Server端的實現,IoConnector是針對Client端的實現;IoService的職責包括:

1、監聽器管理

2、IoHandler

3、IoSession管理

4、FilterChain管理

5、Statistics管理

技術分享

IoAcceptor

主要用於創建新的連接。MINA提供了多種實現,所以幾乎不需要我們自己再去實現:

NioSocketAcceptor:無阻塞的Socket 傳輸Acceptor,針對TCP

NioDatagramAcceptor : 無阻塞的Socket 傳輸Acceptor,針對UDP

AprSocketAcceptor : 阻塞的Socket 傳輸Acceptor,基於 APR

VmPipeSocketAcceptor : the in-VM Acceptor

IoConnector

針對Client端的Socket連接,有多種實現:

NioSocketConnector : 無阻塞的Socket 傳輸Connector,針對TCP
NioDatagramConnector : 無阻塞的Socket 傳輸Connector,針對UDP
AprSocketConnector : 阻塞的Socket 傳輸Connector,基於 APR

ProxyConnector : 一個支持代理服務的 Connector ,通過截取連接的請求,並將終端指向代理設置的地址。

SerialConnector : 針對串口傳輸的Connector

VmPipeConnector : the in-VM * Connector*

Session

任何時候只要有新的連接到來,都會生成一個Session對象,並且一致保存在內存中,只到連接斷開;

Session有一系列狀態,如下:

Connected : session被創建,並有效
Idle : session至少在一個空閑周期(見配置)內沒有處理過任何請求
Idle for read : 在一個空閑周期內沒有做實際的讀操作

Idle for write : 在一個空閑周期內沒有做實際的寫操作

Idle for both : 在一個空閑周期內沒有做實際的讀和寫操作
Closing :session正在被關閉

Closed : session已經被關閉

技術分享

IoBuffer

IoBuffer是MINA內部使用的一個byte buffer,MINA並沒有直接使用NIO 的ByteBuffer。不過IoBuffer 是對 ByteBuffer 的一個封裝。IoBuffer 中的很多方法都是對 ByteBuffer 的直接繼承。只是對 ByteBuffer 添加了一些擴展了更加實用的方法。

基本用法

由於IoBuffer是對Nio的ByteBuffer 的封裝,所以基本概念還是相同的,下面簡單介紹一下:

1、capacity:該屬性描述這個緩沖區最多能緩沖多少個元素,也是Buffer最大存儲元素數,這個值是在創建Buffer的時候指定的,且不能修改。

2、Limit:在從Buffer中向Channel中寫數據時,limit變量指示了還剩多少數據可以讀取,在從Channel中讀取數據到Buffer中時,limit變量指示了還剩多少空間可供存放數據。position正常情況下小於或者等於limit。

3、Position:Buffer實際上也就是個array。當你從Channel中讀數據時,你把從Channel中讀出來的數據放進底層array,position變量用來跟蹤截止目前為止已經寫了多少數據。更精確的講,它指示如果下次寫Buffer時數據應該進入array的哪個位置。因此如果已經從Channel中讀出了3個字節,Buffer的position會被置為3,指向array中第四個位置。

4、Mark:一個可以記憶的Position位置的值,在調用reset()方法時會將緩沖區的Position重置為該索引,並非總是需要定義Mark,但是在定義Mark時,不能將其定義為負數,並且不能讓它大於Position,如果定義了Mark,則在該Position或Limit調整為小於該Mark值時,該Mark將被丟棄。

下面通過一個例子來說明:

i、初始狀態下:

此時position為0,limit和capacity都被設為9;

技術分享

ii、從Channel中讀入4個字節數據到Buffer,這時position指向4(第5個):

技術分享

iii、在做寫操作之前,我們必須調用一次flip()方法,這個方法做了兩件重要的事情:
1. 將limit設置到當前的position處。
2. 設置position為0。

技術分享

iiii、執行寫操作後;

技術分享

iv、執行clear後,position設為0,limit設為capition,mark則丟棄;

技術分享

因為IoBuffer是一個抽象類,不能直接實例化,所有使用的時候需要調用allocate方法來進行內存分配;

allocate有兩種定義:

   1: // Allocates a new buffer with a specific size, defining its type (direct or heap)
   2: public static IoBuffer allocate(int capacity, boolean direct)
   3:  
   4: // Allocates a new buffer with a specific size
   5: public static IoBuffer allocate(int capacity)

這裏:

capacity:buffer的大小;

direct:如果為true,則得到direct buffer,如果為false,則得到heap buffer

direct buffer和heap buffer的區別分析:

Direct Buffer不是分配在堆上的,它不被GC直接管理(但Direct Buffer的JAVA對象是歸GC管理的,只要GC回收了它的JAVA對象,操作系統才會釋放Direct Buffer所申請的空間),它似乎給人感覺是“內核緩沖區(buffer in kernel)”。Heap Buffer則是分配在堆上的,或者我們可以簡單理解為Heap Buffer就是byte[]數組的一種封裝形式。當我們把一個Heap Buffer寫入Channel的時候,實際上底層實現會先構建一個臨時的Direct Buffer,然後把Heap Buffer的內容復制到這個臨時的Direct Buffer上,再把這個Direct Buffer寫出去。因此把一個Direct Buffer寫入一個Channel的速度要比把一個Heap Buffer寫入一個Channel的速度要快。但是Direct Buffer創建和銷毀的代價很高,所以要用在盡可能重用的地方。

public static IoBuffer allocate(int capacity)的用法:
   1: // 設置Allocates分配的默認類型,這裏設為heap buffer.
   2:  IoBuffer.setUseDirectBuffer(false);
   3:  // 返回一個新的heap buffer.
   4:  IoBuffer buf = IoBuffer.allocate(1024);

IoBuffer允許生成一個自動擴展的buffer(這也是沒有選擇使用NIO的ByteBuffer的原因之一);通過設置AutoExpand屬性即可:

   1: IoBuffer buffer = IoBuffer.allocate(8);
   2: buffer.setAutoExpand(true);
   3:  
   4: buffer.putString("12345678", encoder);
   5:        
   6: // Add more to this buffer
   7: buffer.put((byte)10);

Mina框架研究