1. 程式人生 > >mina框架詳解

mina框架詳解

Apache Mina Server 是一個網路通訊應用框架,也就是說,它主要是對基於TCP/IP、UDP/IP協議棧的通訊框架(當然,也可以提供JAVA 物件的序列化服務、虛擬機器管道通訊服務等),Mina 可以幫助我們快速開發高效能、高擴充套件性的網路通訊應用,Mina 提供了事件驅動、非同步(Mina 的非同步IO 預設使用的是JAVA NIO 作為底層支援)操作的程式設計模型。Mina 主要有1.x 和2.x 兩個分支,這裡我們講解最新版本2.0,如果你使用的是Mina 1.x,那麼可能會有一些功能並不適用。學習本文件,需要你已掌握JAVA IO、JAVA NIO、JAVASocket、JAVA 執行緒及併發庫(java.util.concurrent.*)的知識。Mina 同時提供了網路通訊的Server 端、Client 端的封裝,無論是哪端,Mina 在整個網通通訊結構中都處於如下的位置:可見Mina 的API 將真正的網路通訊與我們的應用程式隔離開來,你只需要關心你要傳送、接收的資料以及你的業務邏輯即可。同樣的,無論是哪端,Mina 的執行流程如下所示:

(1.) IoService:這個介面在一個執行緒上負責套接字的建立,擁有自己的Selector,監聽是否有連線被建立。

(2.) IoProcessor:這個介面在另一個執行緒上,負責檢查是否有資料在通道上讀寫,也就是說它也擁有自己的Selector,這是與我們使用JAVA NIO 編碼時的一個不同之處,通常在JAVA NIO 編碼中,我們都是使用一個Selector,也就是不區分IoService與IoProcessor 兩個功能介面。另外,IoProcessor 負責呼叫註冊在IoService 上的過濾器,並在過濾器鏈之後呼叫IoHandler。
(3.) IoFilter:這個介面定義一組攔截器,這些攔截器可以包括日誌輸出、黑名單過濾、資料的編碼(write 方向)與解碼(read 方向)等功能,其中資料的encode 與decode是最為重要的、也是你在使用Mina 時最主要關注的地方。
(4.) IoHandler

:這個介面負責編寫業務邏輯,也就是接收、傳送資料的地方。

1. 簡單的TCPServer:
(1.) 第一步:編寫IoService
  按照上面的執行流程,我們首先需要編寫IoService,IoService 本身既是服務端,又是客戶端,我們這裡編寫服務端,所以使用IoAcceptor 實現,由於IoAcceptor 是與協議無關的,因為我們要編寫TCPServer,所以我們使用IoAcceptor 的實現NioSocketAcceptor,實際上底層就是呼叫java.nio.channels.ServerSocketChannel 類。當然,如果你使用了Apache 的APR 庫,那麼你可以選擇使AprSocketAcceptor 作為TCPServer 的實現,據傳說Apache APR庫的效能比JVM 自帶的本地庫高出很多。那麼IoProcessor 是由指定的IoService 內部建立並呼叫的,我們並不需要關心。

IoAcceptor acceptor=new NioSocketAcceptor();  
acceptor.getSessionConfig().setReadBufferSize(2048);  
acceptor.getSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,10);  
acceptor.bind(new InetSocketAddress(9123)); 

這段程式碼我們初始化了服務端的TCP/IP 的基於NIO 的套接字,然後呼叫IoSessionConfig設定讀取資料的緩衝區大小、讀寫通道均在10 秒內無任何操作就進入空閒狀態。

(2.) 第二步:編寫過濾器
這裡我們處理最簡單的字串傳輸,Mina 已經為我們提供了TextLineCodecFactory 編解碼器工廠來對字串進行編解碼處理。

acceptor.getFilterChain().addLast("codec",  new ProtocolCodecFilter(new TextLineCodecFactory(  
Charset.forName("UTF-8"),  
LineDelimeter.WINDOWS.getValue(),  
LineDelimiter. WINDOWS.getValue()))  
);  

這段程式碼要在acceptor.bind()方法之前執行,因為繫結套接字之後就不能再做這些準備工作了。這裡先不用清楚編解碼器是如何工作的,這個是後面重點說明的內容,這裡你只需要清楚,我們傳輸的以換行符為標識的資料,所以使用了Mina 自帶的換行符編解碼器工廠。

(3.) 第三步:編寫IoHandler

這裡我們只是簡單的列印Client 傳說過來的資料。

public class MyIoHandler extends IoHandlerAdapter {  
// 這裡我們使用的SLF4J作為日誌門面,至於為什麼在後面說明。  
private final static Logger log = LoggerFactory  
.getLogger(MyIoHandler.class);  
@Override  
public void messageReceived(IoSession session, Object message)  
throws Exception {  
String str = message.toString();  
log.info("The message received is [" + str + "]");  
if (str.endsWith("quit")) {  
session.close(true);  
return;  
}  
}  
}  

然後我們把這個IoHandler 註冊到IoService:
acceptor.setHandler(new MyIoHandler());  

當然這段程式碼也要在acceptor.bind()方法之前執行。然後我們執行MyServer 中的main 方法,你可以看到控制檯一直處於阻塞狀態,此時,我們用telnet 127.0.0.1 9123 訪問,然後輸入一些內容,當按下回車鍵,你會發現資料在Server 端被輸出,但要注意不要輸入中文,因為Windows 的命令列視窗不會對傳輸的資料進行UTF-8 編碼。當輸入quit 結尾的字串時,連線被斷開。這裡注意你如果使用的作業系統,或者使用的Telnet 軟體的換行符是什麼,如果不清楚,可以刪掉第二步中的兩個紅色的引數,使用TextLineCodec 內部的自動識別機制。

2. 簡單的TCPClient:
這裡我們實現Mina 中的TCPClient,因為前面說過無論是Server 端還是Client 端,在Mina中的執行流程都是一樣的。唯一不同的就是IoService 的Client 端實現是IoConnector。

(1.) 第一步:編寫IoService並註冊過濾器

public class MyClient {  
main方法:  
IoConnector connector=new NioSocketConnector();  
connector.setConnectTimeoutMillis(30000);  
connector.getFilterChain().addLast("codec",  
new ProtocolCodecFilter(  
new TextLineCodecFactory(  
Charset.forName("UTF-8"),  
LineDelimiter.WINDOWS.getValue(),  
LineDelimiter.WINDOWS.getValue()  
)  
)  
);  
connector.connect(new InetSocketAddress("localhost", 9123));  
}  

(2.) 第三步:編寫IoHandler
public class ClientHandler extends IoHandlerAdapter {  
private final static Logger LOGGER = LoggerFactory  
.getLogger(ClientHandler.class);  
private final String values;  
public ClientHandler(String values) {  
this.values = values;  
}  
@Override  
public void sessionOpened(IoSession session) {  
session.write(values);  
}  
}  

註冊IoHandler:
connector.setHandler(new ClientHandler("你好!\r\n 大家好!"));  

然後我們執行MyClient,你會發現MyServer 輸出如下語句:
The message received is [你好!]
The message received is [大家好!]
我們看到服務端是按照收到兩條訊息輸出的,因為我們用的編解碼器是以換行符判斷資料是否讀取完畢的。

3. 介紹Mina的TCP的主要介面:
通過上面的兩個示例,你應該對Mina 如何編寫TCP/IP 協議棧的網路通訊有了一些感性的認識。
(1.)IoService:
這個介面是服務端IoAcceptor、客戶端IoConnector 的抽象,提供IO 服務和管理IoSession的功能,它有如下幾個常用的方法:
A. TransportMetadata getTransportMetadata():
這個方法獲取傳輸方式的元資料描述資訊,也就是底層到底基於什麼的實現,譬如:nio、apr 等。
B. void addListener(IoServiceListener listener):
這個方法可以為IoService 增加一個監聽器,用於監聽IoService 的建立、活動、失效、空閒、銷燬,具體可以參考IoServiceListener 介面中的方法,這為你參與IoService 的生命週期提供了機會。
C. void removeListener(IoServiceListener listener):
這個方法用於移除上面的方法新增的監聽器。
D. void setHandler(IoHandler handler):
這個方法用於向IoService 註冊IoHandler,同時有getHandler()方法獲取Handler。
E. Map<Long,IoSession> getManagedSessions():
這個方法獲取IoService 上管理的所有IoSession,Map 的key 是IoSession 的id。
F. IoSessionConfig getSessionConfig():
這個方法用於獲取IoSession 的配置物件,通過IoSessionConfig 物件可以設定Socket 連線的一些選項。

(2.)IoAcceptor:
這個介面是TCPServer 的介面,主要增加了void bind()監聽埠、void unbind()解除對套接字的監聽等方法。這裡與傳統的JAVA 中的ServerSocket 不同的是IoAcceptor 可以多次呼叫bind()方法(或者在一個方法中傳入多個SocketAddress 引數)同時監聽多個埠。

3.)IoConnector:
這個介面是TCPClient 的介面, 主要增加了ConnectFuture connect(SocketAddressremoteAddress,SocketAddress localAddress)方法,用於與Server 端建立連線,第二個引數如果不傳遞則使用本地的一個隨機埠訪問Server 端。這個方法是非同步執行的,同樣的,也可以同時連線多個服務端。

(4.)IoSession:
這個介面用於表示Server 端與Client 端的連線,IoAcceptor.accept()的時候返回例項。
這個介面有如下常用的方法:
A. WriteFuture write(Object message):
這個方法用於寫資料,該操作是非同步的。
B. CloseFuture close(boolean immediately):
這個方法用於關閉IoSession,該操作也是非同步的,引數指定true 表示立即關閉,否則就在所有的寫操作都flush 之後再關閉。
C. Object setAttribute(Object key,Object value):
這個方法用於給我們向會話中新增一些屬性,這樣可以在會話過程中都可以使用,類似於HttpSession 的setAttrbute()方法。IoSession 內部使用同步的HashMap 儲存你新增的自
定義屬性。
D. SocketAddress getRemoteAddress():
這個方法獲取遠端連線的套接字地址。
E. void suspendWrite():
這個方法用於掛起寫操作,那麼有void resumeWrite()方法與之配對。對於read()方法同樣適用。
F. ReadFuture read():
這個方法用於讀取資料, 但預設是不能使用的, 你需要呼叫IoSessionConfig 的setUseReadOperation(true)才可以使用這個非同步讀取的方法。一般我們不會用到這個方法,因為這個方法的內部實現是將資料儲存到一個BlockingQueue,假如是Server 端,因為大量的Client 端傳送的資料在Server 端都這麼讀取,那麼可能會導致記憶體洩漏,但對於Client,可能有的時候會比較便利。
G. IoService getService():
這個方法返回與當前會話物件關聯的IoService 例項。
關於TCP連線的關閉:
無論在客戶端還是服務端,IoSession 都用於表示底層的一個TCP 連線,那麼你會發現無論是Server 端還是Client 端的IoSession 呼叫close()方法之後,TCP 連線雖然顯示關閉, 但主執行緒仍然在執行,也就是JVM 並未退出,這是因為IoSession 的close()僅僅是關閉了TCP的連線通道,並沒有關閉Server 端、Client 端的程式。你需要呼叫IoService 的dispose()方法停止Server 端、Client 端。

(5.)IoSessionConfig:
這個方法用於指定此次會話的配置,它有如下常用的方法:
A. void setReadBufferSize(int size):

這個方法設定讀取緩衝的位元組數,但一般不需要呼叫這個方法,因為IoProcessor 會自動調整緩衝的大小。你可以呼叫setMinReadBufferSize()、setMaxReadBufferSize()方法,這樣無論IoProcessor 無論如何自動調整,都會在你指定的區間。
B. void setIdleTime(IdleStatus status,int idleTime):
這個方法設定關聯在通道上的讀、寫或者是讀寫事件在指定時間內未發生,該通道就進入空閒狀態。一旦呼叫這個方法,則每隔idleTime 都會回撥過濾器、IoHandler 中的sessionIdle()方法。
C. void setWriteTimeout(int time):
這個方法設定寫操作的超時時間。
D. void setUseReadOperation(boolean useReadOperation):
這個方法設定IoSession 的read()方法是否可用,預設是false。

(6.)IoHandler:
這個介面是你編寫業務邏輯的地方,從上面的示例程式碼可以看出,讀取資料、傳送資料基本都在這個介面總完成,這個例項是繫結到IoService 上的,有且只有一個例項(沒有給一個IoService 注入一個IoHandler 例項會丟擲異常)。它有如下幾個方法:
A. void sessionCreated(IoSession session):
這個方法當一個Session 物件被建立的時候被呼叫。對於TCP 連線來說,連線被接受的時候呼叫,但要注意此時TCP 連線並未建立,此方法僅代表字面含義,也就是連線的物件IoSession 被建立完畢的時候,回撥這個方法。對於UDP 來說,當有資料包收到的時候回撥這個方法,因為UDP 是無連線的。
B. void sessionOpened(IoSession session):
這個方法在連線被開啟時呼叫,它總是在sessionCreated()方法之後被呼叫。對於TCP 來說,它是在連線被建立之後呼叫,你可以在這裡執行一些認證操作、傳送資料等。對於UDP 來說,這個方法與sessionCreated()沒什麼區別,但是緊跟其後執行。如果你每隔一段時間,傳送一些資料,那麼sessionCreated()方法只會在第一次呼叫,但是sessionOpened()方法每次都會呼叫。
C. void sessionClosed(IoSession session) :
對於TCP 來說,連線被關閉時,呼叫這個方法。對於UDP 來說,IoSession 的close()方法被呼叫時才會毀掉這個方法。
D. void sessionIdle(IoSession session, IdleStatus status) :
這個方法在IoSession 的通道進入空閒狀態時呼叫,對於UDP 協議來說,這個方法始終不會被呼叫。
E. void exceptionCaught(IoSession session, Throwable cause) :
這個方法在你的程式、Mina 自身出現異常時回撥,一般這裡是關閉IoSession。

F. void messageReceived(IoSession session, Object message) :
接收到訊息時呼叫的方法,也就是用於接收訊息的方法,一般情況下,message 是一個IoBuffer 類,如果你使用了協議編解碼器,那麼可以強制轉換為你需要的型別。通常我們都是會使用協議編解碼器的, 就像上面的例子, 因為協議編解碼器是
TextLineCodecFactory,所以我們可以強制轉message 為String 型別。
G. void messageSent(IoSession session, Object message) :
當傳送訊息成功時呼叫這個方法,注意這裡的措辭,傳送成功之後,也就是說傳送訊息是不能用這個方法的。
傳送訊息的時機:
傳送訊息應該在sessionOpened()、messageReceived()方法中呼叫IoSession.write()方法完成。因為在sessionOpened()方法中,TCP 連線已經真正開啟,同樣的在messageReceived()方法TCP 連線也是開啟狀態,只不過兩者的時機不同。sessionOpened()方法是在TCP 連線建立之後,接收到資料之前傳送;messageReceived()方法是在接收到資料之後傳送,你可以完成依據收到的內容是什麼樣子,決定傳送什麼樣的資料。因為這個介面中的方法太多,因此通常使用介面卡模式IoHandlerAdapter,覆蓋你所感興趣的方法即可。

(7.)IoBuffer:
這個介面是對JAVA NIO 的ByteBuffer 的封裝,這主要是因為ByteBuffer 只提供了對基本資料型別的讀寫操作,沒有提供對字串等物件型別的讀寫方法,使用起來更為方便,另外,ByteBuffer 是定長的,如果想要可變,將很麻煩。IoBuffer 的可變長度的實現類似於StringBuffer。IoBuffer 與ByteBuffer 一樣,都是非執行緒安全的。本節的一些內容如果不清楚,可以參考java.nio.ByteBuffer 介面。這個介面有如下常用的方法:
A. static IoBuffer allocate(int capacity,boolean useDirectBuffer):
這個方法內部通過SimpleBufferAllocator 建立一個例項,第一個引數指定初始化容量,第二個引數指定使用直接緩衝區還是JAVA 記憶體堆的快取區,預設為false。
B. void free():
釋放緩衝區,以便被一些IoBufferAllocator 的實現重用,一般沒有必要呼叫這個方法,除非你想提升效能(但可能未必效果明顯)。
C. IoBuffer setAutoExpand(boolean autoExpand):
這個方法設定IoBuffer 為自動擴充套件容量,也就是前面所說的長度可變,那麼可以看出長度可變這個特性預設是不開啟的。
D. IoBuffer setAutoShrink(boolean autoShrink):
這個方法設定IoBuffer 為自動收縮,這樣在compact()方法呼叫之後,可以裁減掉一些沒有使用的空間。如果這個方法沒有被呼叫或者設定為false,你也可以通過呼叫shrink()方法手動收縮空間。

E. IoBuffer order(ByteOrder bo):
這個方法設定是Big Endian 還是Little Endian,JAVA 中預設是Big Endian,C++和其他語言一般是Little Endian。
F. IoBuffer asReadOnlyBuffer():
這個方法設定IoBuffer 為只讀的。
G. Boolean prefixedDataAvailable(int prefixLength,int maxDataLength):
這個方法用於資料的最開始的1、2、4 個位元組表示的是資料的長度的情況,

prefixLentgh表示這段資料的前幾個位元組(只能是1、2、4 的其中一個),代表的是這段資料的長度,
maxDataLength 表示最多要讀取的位元組數。返回結果依賴於等式
remaining()-prefixLength>=maxDataLength,也就是總的資料-表示長度的位元組,剩下的位元組數要比打算讀取的位元組數大或者相等。
H. String getPrefixedString(int prefixLength,CharsetDecoder decoder):
如果上面的方法返回true,那麼這個方法將開始讀取表示長度的位元組之後的資料,注意要保持這兩個方法的prefixLength 的值是一樣的。
G、H 兩個方法在後面講到的PrefixedStringDecoder 中的內部實現使用。
IoBuffer 剩餘的方法與ByteBuffer 都是差不多的,額外增加了一些便利的操作方法,例如:
IoBuffer putString(String value,CharsetEncoder encoder)可以方便的以指定的編碼方式儲存字串、InputStream asInputStream()方法從IoBuffer 剩餘的未讀的資料中轉為輸入流等。

(8.)IoFuture:
在Mina 的很多操作中,你會看到返回值是XXXFuture,實際上他們都是IoFuture 的子類,看到這樣的返回值,這個方法就說明是非同步執行的,主要的子類有ConnectFuture、CloseFuture 、ReadFuture 、WriteFuture 。這個介面的大部分操作都和
java.util.concurrent.Future 介面是類似的,譬如:await()、awaitUninterruptibly()等,一般我們常用awaitUninterruptibly()方法可以等待非同步執行的結果返回。這個介面有如下常用的方法:
A. IoFuture addListener(IoFutureListener<?> listener):
這個方法用於新增一個監聽器, 在非同步執行的結果返回時監聽器中的回撥方法operationComplete(IoFuture future),也就是說,這是替代awaitUninterruptibly()方法另一種等待非同步執行結果的方法,它的好處是不會產生阻塞。
B. IoFuture removeListener(IoFutureListener<?> listener):
這個方法用於移除指定的監聽器。
C. IoSession getSession():
這個方法返回當前的IoSession。舉個例子,我們在客戶端呼叫connect()方法訪問Server 端的時候,實際上這就是一個非同步執行的方法,也就是呼叫connect()方法之後立即返回,執行下面的程式碼,而不管是否連

接成功。那麼如果我想在連線成功之後執行一些事情(譬如:獲取連線成功後的IoSession物件),該怎麼辦呢?按照上面的說明,你有如下兩種辦法:

第一種:
ConnectFuture future = connector.connect(new InetSocketAddress(  
HOSTNAME, PORT));  
// 等待是否連線成功,相當於是轉非同步執行為同步執行。  
future.awaitUninterruptibly();  
// 連線成功後獲取會話物件。如果沒有上面的等待,由於connect()方法是非同步的,session  
可能會無法獲取。  
session = future.getSession();  

第二種:
ConnectFuture future = connector.connect(new InetSocketAddress(  
HOSTNAME, PORT));  
future.addListener(new IoFutureListener<ConnectFuture>() {  
@Override  
public void operationComplete(ConnectFuture future) {  
try {  
Thread.sleep(5000);  
} catch (InterruptedException e) {  
e.printStackTrace();  
}  
IoSession session = future.getSession();  
System.out.println("++++++++++++++++++++++++++++");  
}  
});  
System.out.println("*************");  

為了更好的看清楚使用監聽器是非同步的,而不是像awaitUninterruptibly()那樣會阻塞主執行緒的執行,我們在回撥方法中暫停5 秒鐘,然後輸出+++,在最後輸出***。我們執行程式碼之後,你會發現首先輸出***(這證明了監聽器是非同步執行的),然後IoSession 物件Created,系統暫停5 秒,然後輸出+++,最後IoSession 物件Opened,也就是TCP 連線建立。

4.日誌配置:
前面的示例程式碼中提到了使用SLF4J 作為日誌門面,這是因為Mina 內部使用的就是SLF4J,你也使用SLF4J 可以與之保持一致性。Mina 如果想啟用日誌跟蹤Mina 的執行細節,你可以配置LoggingFilter 過濾器,這樣你可
以看到Session 建立、開啟、空閒等一系列細節在日誌中輸出,預設SJF4J 是按照DEBUG級別輸出跟蹤資訊的,如果你想給某一類別的Mina 執行資訊輸出指定日誌輸出級別,可以呼叫LoggingFilter 的setXXXLogLevel(LogLevel.XXX)。

例:

LoggingFilter lf = new LoggingFilter();  
lf.setSessionOpenedLogLevel(LogLevel.ERROR);  
acceptor.getFilterChain().addLast("logger", lf);  

這裡IoSession 被開啟的跟蹤資訊將以ERROR 級別輸出到日誌。

5.過濾器:
前面我們看到了LoggingFilter、ProtocolCodecFilter 兩個過濾器,一個負責日誌輸出,一個負責資料的編解碼,通過最前面的Mina 執行流程圖,在IoProcessor 與IoHandler 之間可以有很多的過濾器,這種設計方式為你提供可插拔似的擴充套件功能提供了非常便利的方式,目前的Apache CXF、Apache Struts2 中的攔截器也都是一樣的設計思路。Mina 中的IoFilter 是單例的,這與CXF、Apache Struts2 沒什麼區別。IoService 例項上會繫結一個DefaultIoFilterChainBuilder 例項,DefaultIoFilterChainBuilder 會把使用內部的EntryImpl 類把所有的過濾器按照順序連在一起,組成一個過濾器鏈。
DefaultIoFilterChainBuilder 類如下常用的方法:
A. void addFirst(String name,IoFilter filter):
這個方法把過濾器新增到過濾器鏈的頭部,頭部就是IoProcessor 之後的第一個過濾器。同樣的addLast()方法把過濾器新增到過濾器鏈的尾部。
B. void addBefore(String baseName,String name,IoFilter filter):
這個方法將過濾器新增到baseName 指定的過濾器的前面,同樣的addAfter()方法把過濾器新增到baseName 指定的過濾器的後面。這裡要注意無論是那種新增方法,每個過濾器的名字(引數name)必須是唯一的。
C. IoFilter remove(Stirng name):
這個方法移除指定名稱的過濾器,你也可以呼叫另一個過載的remove()方法,指定要移除的IoFilter 的型別。
D. List<Entry> getAll():
這個方法返回當前IoService 上註冊的所有過濾器。預設情況下,過濾器鏈中是空的,也就是getAll()方法返回長度為0 的List,但實際Mina內部有兩個隱藏的過濾器:HeadFilter、TailFilter,分別在List 的最開始和最末端,很明顯,TailFilter 在最末端是為了呼叫過濾器鏈之後,呼叫IoHandler。但這兩個過濾器對你來說是透明的,可以忽略它們的存在。編寫一個過濾器很簡單,你需要實現IoFilter 介面,如果你只關注某幾個方法,可以繼承IoFilterAdapter 介面卡類。IoFilter 介面中主要包含兩類方法,一類是與IoHandler 中的方法名一致的方法,相當於攔截IoHandler 中的方法,另一類是IoFilter 的生命週期回撥方法,這些回撥方法的執行順序和解釋如下所示:

(1.)init()在首次新增到鏈中的時候被呼叫,但你必須將這個IoFilter 用
ReferenceCountingFilter 包裝起來,否則init()方法永遠不會被呼叫。
(2.)onPreAdd()在呼叫新增到鏈中的方法時被呼叫,但此時還未真正的加入到鏈。
(3.)onPostAdd()在呼叫新增到鏈中的方法後被調,如果在這個方法中有異常丟擲,則過濾器會立即被移除,同時destroy()方法也會被呼叫(前提是使用ReferenceCountingFilter包裝)。
(4.)onPreRemove()在從鏈中移除之前呼叫。
(5.)onPostRemove()在從鏈中移除之後呼叫。
(6.)destory()在從鏈中移除時被呼叫,使用方法與init()要求相同。
無論是哪個方法,要注意必須在實現時呼叫引數nextFilter 的同名方法,否則,過濾器鏈的執行將被中斷,IoHandler 中的同名方法一樣也不會被執行,這就相當於Servlet 中的Filter 必須呼叫filterChain.doFilter(request,response)才能繼續前進是一樣的道理。
示例:

public class MyIoFilter implements IoFilter {  
@Override  
public void destroy() throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy");  
}  
@Override  
public void exceptionCaught(NextFilter nextFilter, IoSession  
session,  
Throwable cause) throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught");  
nextFilter.exceptionCaught(session, cause);  
}  
@Override  
public void filterClose(NextFilter nextFilter, IoSession session)  
throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose");  
nextFilter.filterClose(session);  
}  
@Override  
public void filterWrite(NextFilter nextFilter, IoSession session,  
WriteRequest writeRequest) throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite");  
nextFilter.filterWrite(session, writeRequest);  
}  
@Override  
public void init() throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init");  
}  
@Override  
public void messageReceived(NextFilter nextFilter, IoSession  
session,  
Object message) throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived");  
nextFilter.messageReceived(session, message);  
}  
@Override  
public void messageSent(NextFilter nextFilter, IoSession session,  
WriteRequest writeRequest) throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent");  
nextFilter.messageSent(session, writeRequest);  
}  
@Override  
public void onPostAdd(IoFilterChain parent, String name,  
NextFilter nextFilter) throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd");  
}  
@Override  
public void onPostRemove(IoFilterChain parent, String name,  
NextFilter nextFilter) throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove");  
}  
@Override  
public void onPreAdd(IoFilterChain parent, String name,  
NextFilter nextFilter) throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd");  
}  
@Override  
public void onPreRemove(IoFilterChain parent, String name,  
NextFilter nextFilter) throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove");  
}  
   
@Override  
public void sessionClosed(NextFilter nextFilter, IoSession session)  
throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed");  
nextFilter.sessionClosed(session);  
}  
@Override  
public void sessionCreated(NextFilter nextFilter, IoSession session)  
throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated");  
nextFilter.sessionCreated(session);  
}  
@Override  
public void sessionIdle(NextFilter nextFilter, IoSession session,  
IdleStatus status) throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle");  
nextFilter.sessionIdle(session, status);  
}  
@Override  
public void sessionOpened(NextFilter nextFilter, IoSession session)  
throws Exception {  
System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened");  
nextFilter.sessionOpened(session);  
}  
}  

我們將這個攔截器註冊到上面的TCPServer 的IoAcceptor 的過濾器鏈中的最後一個:
acceptor.getFilterChain().addLast("myIoFilter",  
new ReferenceCountingFilter(new MyIoFilter()));  

這裡我們將MyIoFilter 用ReferenceCountingFilter 包裝起來,這樣你可以看到init()、destroy()方法呼叫。我們啟動客戶端訪問,然後關閉客戶端,你會看到執行順序如下所示:
init onPreAdd onPostAdd sessionCreated sessionOpened messageReceived filterClose sessionClosed onPreRemove onPostRemove destroy。
IoHandler 的對應方法會跟在上面的對應方法之後執行,這也就是說從橫向(單獨的看一個過濾器中的所有方法的執行順序)上看,每個過濾器的執行順序是上面所示的順序;從縱向(方法鏈的呼叫)上看,如果有filter1、filter2 兩個過濾器,sessionCreated()方法的執行順序如下所示:

filter1-sessionCreated filter2-sessionCreated IoHandler-sessionCreated。
這裡你要注意init、onPreAdd、onPostAdd 三個方法並不是在Server 啟動時呼叫的,而是IoSession 物件建立之前呼叫的,也就是說IoFilterChain.addXXX()方法僅僅負責初始化過濾器並註冊過濾器,但並不呼叫任何方法,包括init()初始化方法也是在IoProcessor 開始工作的時候被呼叫。IoFilter 是單例的,那麼init()方法是否只被執行一次呢?這個是不一定的,因為IoFilter是被IoProcessor 呼叫的,而每個IoService 通常是關聯多個IoProcessor,所以IoFilter的init()方法是在每個IoProcessor 執行緒上只執行一次。關於Mina 的執行緒問題,我們後面會詳細討論,這裡你只需要清楚,init()與destroy()的呼叫次數與IoProceesor 的個數有關,假如一個IoService 關聯了3 個IoProcessor,有五個併發的客戶端請求,那麼你會看到三次init()方法被呼叫,以後將不再會呼叫。Mina中自帶的過濾器:
過濾器 說明
BlacklistFilter 設定一些IP 地址為黑名單,不允許訪問。
BufferedWriteFilter 設定輸出時像BufferedOutputStream 一樣進行緩衝。
CompressionFilter 設定在輸入、輸出流時啟用JZlib 壓縮。
ConnectionThrottleFilter 這個過濾器指定同一個IP 地址(不含埠號)上的請求在多長的毫秒值內可以有一個請求,如果小於指定的時間間隔就有連續兩個請求,那麼第二個請求將被忽略(IoSession.close())。正如Throttle 的名字一樣,調節訪問的頻率這個過濾器最好放在過濾器鏈的前面。
FileRegionWriteFilter 如果你想使用File 物件進行輸出,請使用這個過濾器。要注意,你需要使用WriteFuture 或者在
messageSent() 方法中關閉File 所關聯的FileChannel 通道。
StreamWriteFilter 如果你想使用InputStream 物件進行輸出,請使用這個過濾器。要注意,你需要使用WriteFuture或者在messageSent()方法中關閉File 所關聯的
FileChannel 通道。NoopFilter 這個過濾器什麼也不做,如果你想測試過濾器鏈是否起作用,可以用它來測試。
ProfilerTimerFilter 這個過濾器用於檢測每個事件方法執行的時間,所以最好放在過濾器鏈的前面。
ProxyFilter 這個過濾器在客戶端使用ProxyConnector 作為實現時,會自動加入到過濾器鏈中,用於完成代理功能。
RequestResponseFilter 暫不知曉。

SessionAttributeInitializingFilter 這個過濾器在IoSession 中放入一些屬性(Map),通常放在過濾器的前面,用於放置一些初始化的資訊。
MdcInjectionFilter 針對日誌輸出做MDC 操作,可以參考LOG4J 的MDC、NDC 的文件。
WriteRequestFilter CompressionFilter、RequestResponseFilter 的基類,用於包裝寫請求的過濾器。
還有一些過濾器,會在各節中詳細討論,這裡沒有列出,譬如:前面的LoggingFilger 日誌過濾器。

6.協議編解碼器:
前面說過,協議編解碼器是在使用Mina 的時候你最需要關注的物件,因為在網路傳輸的資料都是二進位制資料(byte),而你在程式中面向的是JAVA 物件,這就需要你實現在傳送資料時將JAVA 物件編碼二進位制資料,而接收資料時將二進位制資料解碼為JAVA 物件(這個可不是JAVA 物件的序列化、反序列化那麼簡單的事情)。Mina 中的協議編解碼器通過過濾器ProtocolCodecFilter 構造,這個過濾器的構造方法需要一個ProtocolCodecFactory,這從前面註冊TextLineCodecFactory 的程式碼就可以看出來。
ProtocolCodecFactory 中有如下兩個方法:
public interface ProtocolCodecFactory {
ProtocolEncoder getEncoder(IoSession session) throws Exception;
ProtocolDecoder getDecoder(IoSession session) throws Exception;
}
因此,構建一個ProtocolCodecFactory 需要ProtocolEncoder、ProtocolDecoder 兩個例項。你可能要問JAVA 物件和二進位制資料之間如何轉換呢?這個要依據具體的通訊協議,也就是Server 端要和Client 端約定網路傳輸的資料是什麼樣的格式,譬如:第一個位元組表示資料長度,第二個位元組是資料型別,後面的就是真正的資料(有可能是文字、有可能是圖片等等),然後你可以依據長度從第三個位元組向後讀,直到讀取到指定第一個位元組指定長度的資料。
簡單的說,HTTP 協議就是一種瀏覽器與Web 伺服器之間約定好的通訊協議,雙方按照指定的協議編解碼資料。我們再直觀一點兒說,前面一直使用的TextLine 編解碼器就是在讀取網路上傳遞過來的資料時,只要發現哪個位元組裡存放的是ASCII 的10、13 字元(/r、/n),就認為之前的位元組就是一個字串(預設使用UTF-8 編碼)。以上所說的就是各種協議實際上就是網路七層結構中的應用層協議,它位於網路層(IP)、傳輸層(TCP)之上,Mina 的協議編解碼器就是讓你實現一套自己的應用層協議棧。

(6-1.)簡單的編解碼器示例:
下面我們舉一個模擬電信運營商簡訊協議的編解碼器實現,假設通訊協議如下所示:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx

L: 21
Hello World!
這裡的第一行表示狀態行,一般表示協議的名字、版本號等,第二行表示簡訊的傳送號碼,第三行表示簡訊接收的號碼,第四行表示簡訊的位元組數,最後的內容就是簡訊的內容。上面的每一行的末尾使用ASC II 的10(/n)作為換行符,因為這是純文字資料,協議要
求雙方使用UTF-8 對字串編解碼。實際上如果你熟悉HTTP 協議,上面的這個精簡的簡訊協議和HTTP 協議的組成是非常像的,第一行是狀態行,中間的是訊息報頭,最後面的是訊息正文。在解析這個簡訊協議之前,你需要知曉TCP 的一個事項,那就是資料的傳送沒有規模性,所謂的規模性就是作為資料的接收端,不知道到底什麼時候資料算是讀取完畢,所以應用層協議在制定的時候,必須指定資料讀取的截至點。一般來說,有如下三種方式設定資料讀取的長度:
(1.)使用分隔符,譬如:TextLine 編解碼器。你可以使用/r、/n、NUL 這些ASC II 中的特殊的字元來告訴資料接收端,你只要遇見分隔符,就表示資料讀完了,不用在那裡傻等著不知道還有沒有資料沒讀完啊?我可不可以開始把已經讀取到的位元組解碼為指定的資料型別了啊?
(2.)定長的位元組數,這種方式是使用長度固定的資料傳送,一般適用於指令傳送,譬如:資料傳送端規定傳送的資料都是雙位元組,AA 表示啟動、BB 表示關閉等等。
(3.)在資料中的某個位置使用一個長度域,表示資料的長度,這種處理方式最為靈活,上面的簡訊協議中的那個L 就是簡訊文字的位元組數,其實HTTP 協議的訊息報頭中的Content-Length 也是表示訊息正文的長度,這樣資料的接收端就知道我到底讀到多長的
位元組數就表示不用再讀取資料了。相比較解碼(位元組轉為JAVA 物件,也叫做拆包)來說,編碼(JAVA 物件轉為位元組,也叫做打包)就很簡單了,你只需要把JAVA 物件轉為指定格式的位元組流,write()就可以了。下面我們開始對上面的簡訊協議進行編解碼處理。
第一步,協議物件:

public class SmsObject {  
private String sender;// 簡訊傳送者  
private String receiver;// 簡訊接受者  
private String message;// 簡訊內容  
public String getSender() {  
return sender;  
}  
public void setSender(String sender) {  
this.sender = sender;  
}  
public String getReceiver() {  
return receiver;  
}  
public void setReceiver(String receiver) {  
this.receiver = receiver;  
}  
public String getMessage() {  
return message;  
}  
public void setMessage(String message) {  
this.message = message;  
}  
}  

第二步,編碼器:
在Mina 中編寫編碼器可以實現ProtocolEncoder,其中有encode()、dispose()兩個方法需要實現。這裡的dispose()方法用於在銷燬編碼器時釋放關聯的資源,由於這個方法一般我們並不關心,所以通常我們直接繼承介面卡ProtocolEncoderAdapter。
public class CmccSipcEncoder extends ProtocolEncoderAdapter {  
private final Charset charset;  
public CmccSipcEncoder(Charset charset) {  
this.charset = charset;  
}  
@Override  
public void encode(IoSession session, Object message,  
ProtocolEncoderOutput out) throws Exception {  
SmsObject sms = (SmsObject) message;  
CharsetEncoder ce = charset.newEncoder();  
IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);  
String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";  
String sender = sms.getSender();  
String receiver = sms.getReceiver();  
String smsContent = sms.getMessage();  
buffer.putString(statusLine + '/n', ce);  
buffer.putString("S: " + sender + '/n', ce);  
buffer.putString("R: " + receiver + '/n', ce);  
buffer  
.putString("L: " + (smsContent.getBytes(charset).length)  
+ "/n",  
ce);  
buffer.putString(smsContent, ce);  
buffer.flip();  
out.write(buffer);  
}  
}  

這裡我們依據傳入的字符集型別對message 物件進行編碼,編碼的方式就是按照簡訊協議拼裝字串到IoBuffer 緩衝區,然後呼叫ProtocolEncoderOutput 的write()方法輸出位元組流。這裡要注意生成簡訊內容長度時的紅色程式碼,我們使用String 類與Byte[]型別之間的轉換方法獲得轉為位元組流後的位元組數。
解碼器的編寫有以下幾個步驟:
A. 將 encode()方法中的message 物件強制轉換為指定的物件型別;
B. 建立IoBuffer 緩衝區物件,並設定為自動擴充套件;
C. 將轉換後的message 物件中的各個部分按照指定的應用層協議進行組裝,並put()到IoBuffer 緩衝區;
D. 當你組裝資料完畢之後,呼叫flip()方法,為輸出做好準備,切記在write()方法之前,要呼叫IoBuffer 的flip()方法,否則緩衝區的position 的後面是沒有資料可以用來輸出的,你必須呼叫flip()方法將position 移至0,limit 移至剛才的position。這個flip()方法的含義請參看java.nio.ByteBuffer。
E. 最後呼叫ProtocolEncoderOutput 的write()方法輸出IoBuffer 緩衝區例項。
第三步,解碼器:
在Mina 中編寫解碼器,可以實現ProtocolDecoder 介面,其中有decode()、finishDecode()、dispose()三個方法。這裡的finishDecode()方法可以用於處理在IoSession 關閉時剩餘的未讀取資料,一般這個方法並不會被使用到,除非協議中未定義任何標識資料什麼時候截止的約定,譬如:Http 響應的Content-Length 未設定,那麼在你認為讀取完資料後,關閉TCP連線(IoSession 的關閉)後,就可以呼叫這個方法處理剩餘的資料,當然你也可以忽略調剩餘的資料。同樣的,一般情況下,我們只需要繼承介面卡ProtocolDecoderAdapter,關注decode()方法即可。但前面說過解碼器相對編碼器來說,最麻煩的是資料傳送過來的規模,以聊天室為例,一個TCP 連線建立之後,那麼隔一段時間就會有聊天內容傳送過來,也就是decode()方法會被往復呼叫,這樣處理起來就會非常麻煩。那麼Mina 中幸好提供了CumulativeProtocolDecoder類,從名字上可以看出累積性的協議解碼器,也就是說只要有資料傳送過來,這個類就會去讀取資料,然後累積到內部的IoBuffer 緩衝區,但是具體的拆包(把累積到緩衝區的資料解碼為JAVA 物件)交由子類的doDecode()方法完成,實際上CumulativeProtocolDecoder就是在decode()反覆的呼叫暴漏給子類實現的doDecode()方法。
具體執行過程如下所示:
A. 你的doDecode()方法返回true 時,CumulativeProtocolDecoder 的decode()方法會首先判斷你是否在doDecode()方法中從內部的IoBuffer 緩衝區讀取了資料,如果沒有,則會丟擲非法的狀態異常,也就是你的doDecode()方法返回true 就表示你已經消費了本次資料(相當於聊天室中一個完整的訊息已經讀取完畢),進一步說,也就是此時你必須已經消費過內部的IoBuffer 緩衝區的資料(哪怕是消費了一個位元組的資料)。如果驗證過通過,那麼CumulativeProtocolDecoder 會檢查緩衝區內是否還有資料未讀取,如果有就繼續呼叫doDecode()方法,沒有就停止對doDecode()方法的呼叫,直到有新的資料被緩衝。

B. 當你的doDecode()方法返回false 時,CumulativeProtocolDecoder 會停止對doDecode()方法的呼叫,但此時如果本次資料還有未讀取完的,就將含有剩餘資料的IoBuffer 緩衝區儲存到IoSession 中,以便下一次資料到來時可以從IoSession 中提取合併。如果發現本次資料全都讀取完畢,則清空IoBuffer 緩衝區。簡而言之,當你認為讀取到的資料已經夠解碼了,那麼就返回true,否則就返回false。這個CumulativeProtocolDecoder 其實最重要的工作就是幫你完成了資料的累積,因為這個工作是很煩瑣的。

public class CmccSipcDecoder extends CumulativeProtocolDecoder {  
private final Charset charset;  
public CmccSipcDecoder(Charset charset) {  
this.charset = charset;  
}  
@Override  
protected boolean doDecode(IoSession session, IoBuffer in,  
ProtocolDecoderOutput out) throws Exception {  
IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);  
CharsetDecoder cd = charset.newDecoder();  
int matchCount = 0;  
String statusLine = "", sender = "", receiver = "", length = "",  
sms = "";  
int i = 1;  
while (in.hasRemaining()) {  
byte b = in.get();  
buffer.put(b);  
if (b == 10 && i < 5) {  
matchCount++;  
if (i == 1) {  
buffer.flip();  
statusLine = buffer.getString(matchCount, cd);  
statusLine = statusLine.substring(0,  
statusLine.length() - 1);  
matchCount = 0;  
buffer.clear();  
}  
if (i == 2) {  
buffer.flip();  
sender = buffer.getString(matchCount, cd);  
sender = sender.substring(0, sender.length() -1);  
matchCount = 0;  
buffer.clear();  
}  
if (i == 3) {  
buffer.flip();  
receiver = buffer.getString(matchCount, cd);  
receiver = receiver.substring(0, receiver.length()  
1);  
matchCount = 0;  
buffer.clear();  
}  
if (i == 4) {  
buffer.flip();  
length = buffer.getString(matchCount, cd);  
length = length.substring(0, length.length() -1);  
matchCount = 0;  
buffer.clear();  
}  
i++;  
} else if (i == 5) {  
matchCount++;  
if (matchCount == Long.parseLong(length.split(": ")[1]))  
{  
buffer.flip();  
sms = buffer.getString(matchCount, cd);  
i++;  
break;  
}  
} else {  
matchCount++;  
}  
}  
SmsObject smsObject = new SmsObject();  
smsObject.setSender(sender.split(": ")[1]);  
smsObject.setReceiver(receiver.split(": ")[1]);  
smsObject.setMessage(sms);  
out.write(smsObject);  
return false;  
}  
}  

我們的這個簡訊協議解碼器使用/n(ASCII 的10 字元)作為分解點,一個位元組一個位元組的讀取,那麼第一次發現/n 的位元組位置之前的部分,必然就是簡訊協議的狀態行,依次類推,你就可以解析出來發送者、接受者、簡訊內容長度。然後我們在解析簡訊內容時,使用獲取到的長度進行讀取。全部讀取完畢之後, 然後構造SmsObject 簡訊物件, 使用ProtocolDecoderOutput 的write()方法輸出,最後返回false,也就是本次資料全部讀取完畢,告知CumulativeProtocolDecoder 在本次資料讀取中不需要再呼叫doDecode()方法了。這裡需要注意的是兩個狀態變數i、matchCount,i 用於記錄解析到了簡訊協議中的哪一行(/n),matchCount 記錄在當前行中讀取到了哪一個位元組。狀態變數在解碼器中經常被使用,我們這裡的情況比較簡單,因為我們假定簡訊傳送是在一次資料傳送中完成的,所以狀態變數的使用也比較簡單。假如資料的傳送被拆成了多次(譬如:簡訊協議的簡訊內容、訊息報頭被拆成了兩次資料傳送),那麼上面的程式碼勢必就會存在問題,因為當第二次呼叫doDecode()方法時,狀態變數i、matchCount 勢必會被重置,也就是原來的狀態值並沒有被儲存。那麼我們如何解決狀態儲存的問題呢?答案就是將狀態變數儲存在IoSession 中或者是Decoder 例項自身,但推薦使用前者,因為雖然Decoder 是單例的,其中的例項變數儲存的狀態在Decoder 例項銷燬前始終保持,但Mina 並不保證每次呼叫doDecode()方法時都是同一個執行緒(這也就是說第一次呼叫doDecode()是IoProcessor-1 執行緒,第二次有可能就是IoProcessor-2 執行緒),這就會產生多執行緒中的例項變數的可視性(Visibility,具體請參考JAVA 的多執行緒知識)問題。IoSession中使用一個同步的HashMap 儲存物件,所以你不需要擔心多執行緒帶來的問題。使用IoSession 儲存解碼器的狀態變數通常的寫法如下所示:
A. 在解碼器中定義私有的內部類Context,然後將需要儲存的狀態變數定義在Context 中儲存。
B. 在解碼器中定義方法獲取這個Context 的例項,這個方法的實現要優先從IoSession 中獲取Context。
具體程式碼示例如下所示:
// 上下文作為儲存狀態的內部類的名字,意思很明顯,就是讓狀態跟隨上下文,在整個呼叫過程中都可以被保持。
public class XXXDecoder extends CumulativeProtocolDecoder{  
private final AttributeKey CONTEXT =  
new AttributeKey(getClass(), "context" );  
public Context getContext(IoSession session){  
Context ctx=(Context)session.getAttribute(CONTEXT);  
if(ctx==null){  
ctx=new Context();  
session.setAttribute(CONTEXT,ctx);  
}  
}  
private class Context {  
//狀態變數  
}  
}  

注意這裡我們使用了Mina 自帶的AttributeKey 類來定義儲存在IoSession 中的物件的鍵值,這樣可以有效的防止鍵值重複。另外,要注意在全部處理完畢之後,狀態要復位,譬如:聊天室中的一條訊息讀取完畢之後,狀態變數要變為初始值,以便下次處理時重新使用。
第四步,編解碼工廠:
public class CmccSipcCodecFactory implements ProtocolCodecFactory {  
private final CmccSipcEncoder encoder;  
private final CmccSipcDecoder decoder;  
public CmccSipcCodecFactory() {  
this(Charset.defaultCharset());  
}  
public CmccSipcCodecFactory(Charset charSet) {  
this.encoder = new CmccSipcEncoder(charSet);  
this.decoder = new CmccSipcDecoder(charSet);  
}  
@Override  
public ProtocolDecoder getDecoder(IoSession session) throws  
Exception {  
return decoder;  
}  
@Override  
public ProtocolEncoder getEncoder(IoSession session) throws  
Exception {  
return encoder;  
}  
}  

實際上這個工廠類就是包裝了編碼器、解碼器,通過介面中的getEncoder()、getDecoder()方法向ProtocolCodecFilter 過濾器返回編解碼器例項,以便在過濾器中對資料進行編解碼處理。
第五步,執行示例:
下面我們修改最一開始的示例中的MyServer、MyClient 的程式碼,如下所示:
acceptor.getFilterChain().addLast(  
"codec",  
new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset  
.forName("UTF-8"))));  
connector.getFilterChain().addLast(  
"codec",  
new ProtocolCodecFilter(new  
CmccSipcCodecFactory(  
Charset.forName("UTF-8"))));  
然後我們在ClientHandler 中傳送一條簡訊:  
public void sessionOpened(IoSession session) {  
SmsObject sms = new SmsObject();  
sms.setSender("15801012253");  
sms.setReceiver("18869693235");  
sms.setMessage("你好!Hello World!");  
session.write(sms);  
}  

最後我們在MyIoHandler 中接收這條簡訊息:
public void messageReceived(IoSession session, Object message)  
throws Exception {  
SmsObject sms = (SmsObject) message;  
log.info("The message received is [" + sms.getMessage() + "]");  
}  

你會看到Server 端的控制檯輸出如下資訊:
The message received is [你好!Hello World!]

(6-2.)複雜的解碼器:
下面我們講解一下如何在解碼器中儲存狀態變數,也就是真正的實現上面所說的Context。
我們假設這樣一種情況,有兩條簡訊:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
他們按照上面的顏色標識傳送,也就是說紅色部分、藍色部分、綠色部分分別傳送(呼叫三次IoSession.write()方法),那麼如果你還用上面的CmccSipcDecoder,將無法工作,因為第一次資料流(紅色部分)傳送過取時,資料是不完整的,無法解析出一條簡訊息,當二次資料流(藍色部分)傳送過去時,已經可以解析出第一條簡訊息了,但是第二條簡訊還是不完整的,需要等待第三次資料流(綠色部分)的傳送。注意:由於模擬資料傳送的規模性問題很麻煩,所以這裡採用了這種極端的例子說明問題,雖不具有典型性,但很能說明問題,這就足夠了,所以不要追究這種傳送訊息是否在真實環境中存在,更不要追究其合理性。

CmccSispcDecoder 類改為如下的寫法:

public class CmccSipcDecoder extends CumulativeProtocolDecoder {  
private final Charset charset;  
private final AttributeKey CONTEXT = new AttributeKey(getClass(),  
"context");  
public CmccSipcDecoder(Charset charset) {  
this.charset = charset;  
}  
@Override  
protected boolean doDecode(IoSession session, IoBuffer in,  
ProtocolDecoderOutput out) throws Exception {  
Context ctx = getContext(session);  
CharsetDecoder cd = charset.newDecoder();  
int matchCount = ctx.getMatchCount();  
int line = ctx.getLine();  
IoBuffer buffer = ctx.innerBuffer;  
String statusLine = ctx.getStatusLine(),  
sender = ctx.getSender(),  
receiver = ctx.getReceiver(),  
length = ctx.getLength(),  
sms = ctx.getSms();  
while (in.hasRemaining()) {  
byte b = in.get();  
matchCount++;  
buffer.put(b);  
if (line < 4 && b == 10) {  
if (line == 0) {  
buffer.flip();  
statusLine = buffer.getString(matchCount, cd);  
statusLine = statusLine.substring(0,  
statusLine.length() - 1);  
matchCount = 0;  
buffer.clear();  
ctx.setStatusLine(statusLine);  
}  
if (line == 1) {  
buffer.flip();  
sender = buffer.getString(matchCount, cd);  
sender = sender.substring(0, sender.length() - 1);  
matchCount = 0;  
buffer.clear();  
ctx.setSender(sender);  
}  
if (line == 2) {  
buffer.flip();  
receiver = buffer.getString(matchCount, cd);  
receiver = receiver.substring(0, receiver.length() -  
1);  
matchCount = 0;  
buffer.clear();  
ctx.setReceiver(receiver);  
}  
if (line == 3) {  
buffer.flip();  
length = buffer.getString(matchCount, cd);  
length = length.substring(0, length.length() - 1);  
matchCount = 0;  
buffer.clear();  
ctx.setLength(length);  
}  
line++;  
} else if (line == 4) {  
if (matchCount == Long.parseLong(length.split(": ")[1]))  
{  
buffer.flip();  
sms = buffer.getString(matchCount, cd);  
ctx.setSms(sms);  
// 由於下面的break,這裡需要呼叫else外面的兩行程式碼  
ctx.setMatchCount(matchCount);  
ctx.setLine(line);  
break;  
}  
}  
ctx.setMatchCount(matchCount);  
ctx.setLine(line);  
}  
if (ctx.getLine() == 4  
&& Long.parseLong(ctx.getLength().split(": ")[1]) == ctx  
.getMatchCount()) {  
SmsObject smsObject = new SmsObject();  
smsObject.setSender(sender.split(": ")[1]);  
smsObject.setReceiver(receiver.split(": ")[1]);  
smsObject.setMessage(sms);  
out.write(smsObject);  
ctx.reset();  
return true;  
} else {  
return false;  
}  
}  
private Context getContext(IoSession session) {  
Context context = (Context) session.getAttribute(CONTEXT);  
if (context == null){  
context = new Context();  
session.setAttribute(CONTEXT, context);  
}  
return context;  
}  
private class Context {  
private final IoBuffer innerBuffer;  
private String statusLine = "";  
private String sender = "";  
private String receiver = "";  
private String length = "";  
private String sms = "";  
public Context() {  
innerBuffer = IoBuffer.allocate(100).setAutoExpand(true);  
}  
private int matchCount = 0;  
private int line = 0;  
public int getMatchCount() {  
return matchCount;  
}  
public void setMatchCount(int matchCount) {  
this.matchCount = matchCount;  
}  
public int getLine() {  
return line;  
}  
public void setLine(int line) {  
this.line = line;  
}  
public String getStatusLine() {  
return statusLine;  
}  
public void setStatusLine(String statusLine) {  
this.statusLine = statusLine;  
}  
public String getSender() {  
return sender;  
}  
public void setSender(String sender) {  
this.sender = sender;  
}  
public String getReceiver() {  
return receiver;  
}  
public void setReceiver(String receiver) {  
this.receiver = receiver;  
}  
public String getLength() {  
return length;  
}  
public void setLength(String length) {  
this.length = length;  
}  
public String getSms() {  
return sms;  
}  
public void setSms(String sms) {  
this.sms = sms;  
}  
public void reset() {  
this.innerBuffer.clear();  
this.matchCount = 0;  
this.line = 0;  
this.statusLine = "";  
this.sender = "";  
this.receiver = "";  
this.length = "";  
this.sms = "";  
}  
}  
}  

這裡我們做了如下的幾步操作:
(1.) 所有記錄狀態的變數移到了Context 內部類中,包括記錄讀到簡訊協議的哪一行的line。每一行讀取了多少個位元組的matchCount,還有記錄解析好的狀態行、傳送者、接受者、簡訊內容、累積資料的innerBuffer 等。這樣就可以在資料不能完全解碼,等待下一次doDecode()方法的呼叫時,還能承接上一次呼叫的資料。
(2.) 在 doDecode()方法中主要的變化是各種狀態變數首先是從Context 中獲取,然後操作之後,將最新的值setXXX()到Context 中儲存。
(3.) 這裡注意doDecode()方法最後的判斷,當認為不夠解碼為一條簡訊息時,返回false,也就是在本次資料流解碼中不要再呼叫doDecode()方法;當認為已經解碼出一條簡訊息時,輸出短訊息,然後重置所有的狀態變數,返回true,也就是如果本次資料流解碼中還有沒解碼完的資料,繼續呼叫doDecode()方法。下面我們對客戶端稍加改造,來模擬上面的紅、藍、綠三次傳送聊天簡訊息的情況:
MyClient:
ConnectFuture future = connector.connect(new InetSocketAddress(  
HOSTNAME, PORT));  
future.awaitUninterruptibly();  
session = future.getSession();  
for (int i = 0; i < 3; i++) {  
SmsObject sms = new SmsObject();  
session.write(sms);  
System.out.println("****************" + i);  
}  

這裡我們為了方便演示,不在IoHandler 中傳送訊息,而是直接在MyClient 中傳送,你要注意的是三次傳送