Apache Mina 2.0.x 入門
Apache Mina ,一個高效能 Java 非同步併發網路通訊框架。利用 Mina 可以高效地完成以下任務:
Mina 的 API 當前主要有三個分支,分別是:
- 2.0.x 目前處於 SVN trunk 上的版本, Mina 社群對該版本的 API 進行了全新的設計
- 1.1.x 為當前用於產品開發的版本,適用於 5.0 以上的 JDK ,最新版本為 1.1.5
- 1.0.x 是 1.1.x 的 JDK 1.4 的相容版本,最新版本為 1.0.8
這裡將要介紹的是 2.0.x 版。雖然當前的穩定版本還是 1.1.x ,但是按照 Mina 團隊之前的開發計劃, 2.0.x 即將在 08 年夏季正式釋出,並且在 2.0.x 中對 Spring 等 IoC 的整合進行了簡化,添加了基於 OGNL 的 JMX 遠端管理支援,使用基於 Java Annotation 的全新 API 大大簡化了狀態機程式設計,新的基於 Apache APR 的基礎 I/O 元件促進了進一步的效率提升(據官方評測, APR 的效率較之 Sun NIO 要高出約 10%)。由於這一系列的重大改進,使得 2.0.x 成為十分令人期待的一個版本,無論是 Mina 新手還是老使用者,如果你對這個專案抱有興趣,便很有必要提前對這個版本進行一些瞭解。
首先讓我們對非同步 I/O 做一些基本的瞭解。非同步 I/O 模型大體上可以分為兩種,反應式( Reactive )模型和前攝式( Proactive )模型:
傳統的 select / epoll / kqueue 模型,以及 Java NIO 模型,都是典型的反應式模型,即應用程式碼對 I/O 描述符進行註冊,然後等待 I/O 事件。當某個或某些 I/O 描述符所對應的 I/O 裝置上產生 I/O 事件(可讀、可寫、異常等)時,系統將發出通知,於是應用便有機會進行 I/O 操作並避免阻塞。由於在反應式模型中應用程式碼需要根據相應的事件型別採取不同的動作,最常見的結構便是巢狀的 if {...} else {...}
前攝式模型則恰恰相反。在前攝式模型中,應用程式碼主動地投遞非同步操作而不管 I/O 裝置當前是否可讀或可寫。投遞的非同步 I/O 操作被系統接管,應用程式碼也並不阻塞在該操作上,而是指定一個回撥函式並繼續自己的應用邏輯。當該非同步操作完成時,系統將發起通知並呼叫應用程式碼指定的回 調函式。在前攝式模型中,程式邏輯由各個回撥函式串聯起來:非同步操作 A 的回調發起非同步操作 B ,B 的回撥再發起非同步操作 C ,以此往復。 Mina 便是一個前攝式的非同步 I/O 框架。
前攝式模型相較於反射式模型往往更加難以程式設計。然而在具有原生非同步 I/O 支援的作業系統中(例如支援 IO Completion Port 的 Win32 系統),採用前攝式模型往往可以取得比反應式模型更佳的效率。在沒有原生非同步 I/O 支援的系統中,也可以使用傳統的反應式 API 對前攝式模型予以模擬。在現代的軟硬體系統中,使用 epoll 和 kqueue 的前攝式模型實現同樣可以輕鬆解決 C10K 問題。前攝式模型的一個顯著優勢是在實現複雜邏輯的時候不需要藉助於狀態機。因為狀態機已經隱含在由回撥串聯起來的非同步操作鏈當中了。如果上述內容難以理解,可以參考 Boost.Asio ,這是一個相當優秀的跨平臺 C++ 前攝式 I/O 模型實現。
當然,對於程式設計師來說,還是直接看程式碼來得最為直接: Show me the code! 好,以下我們以官方文件上的一個簡單的 TCP Time Server 為示例對 Mina 的基本伺服器程式設計予以剖析。該伺服器的功能是監聽本地所有介面的 8150 埠,當有客戶端連線建立時便向客戶端以文字方式傳送當前時間,並關閉連線。使用 Time Server 的目的在於
- 只需實現伺服器端程式碼,客戶端可有普通 telnet 程式代替
- 使用文字協議,可利用 Mina 內建的 TextLineCodecFactory 來作為協議解析器
Time Server 原始碼分析
以下便是完整的服務端程式碼,稍後再逐行進行分析:
1 package test.mina.time.server;
2
3 import java.io.IOException;
4 import java.net.InetSocketAddress;
5 import java.util.Date;
6
7 import org.apache.commons.logging.Log;
8 import org.apache.commons.logging.LogFactory;
9 import org.apache.mina.common.IoAcceptor;
10 import org.apache.mina.common.IoHandlerAdapter;
11 import org.apache.mina.common.IoSession;
12 import org.apache.mina.filter.codec.ProtocolCodecFilter;
13 import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
14 import org.apache.mina.filter.logging.LoggingFilter;
15 import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
16
17 public class TimeServer {
18
19 static Log log = LogFactory.getLog( TimeServer .class );
20
21 public static void main( final String[] args ) {
22 final IoAcceptor acceptor = new NioSocketAcceptor();
23
24 acceptor.setHandler( new IoHandlerAdapter() {
25
26 @Override
27 public void messageSent( final IoSession session,
28 final Object message )
29 throws Exception {
30 session.close();
31 }
32
33 @Override
34 public void sessionOpened( final IoSession session )
35 throws Exception {
36 session.write( new Date() );
37 }
38
39 } );
40
41 acceptor.getFilterChain().addLast( "codec",
42 new ProtocolCodecFilter( new TextLineCodecFactory() ) );
43 acceptor.getFilterChain().addLast( "logging", new LoggingFilter() );
44
45 try {
46 acceptor.bind( new InetSocketAddress( 8150 ) );
47 }
48 catch( final IOException e ) {
49 log.error( "Bind error: ", e );
50 }
51 }
52
53 }
建立監聽器
22 final IoAcceptor acceptor = new NioSocketAcceptor();
在傳統服務端程式設計中,對於一個 TCP 伺服器,我們需要先建立一個監聽套接字。在 Mina 中,我們建立的並不是一個監聽套接字,而是一個監聽套接字工廠,或者稱之為“監聽器( acceptor )”。該概念對映到 Mina API 中,就是 IoAcceptor 介面及其各個實現類。
傳統的 BSD Socket API 中的監聽套接字以及其在 Java 中的對等物 java.net.ServerSocket 都是套接字工廠,其任務是在某個本地地址上進行監聽,並在有客戶端連線到來時產生一個與客戶端進行通訊的套接字。而 Mina 的 IoAcceptor 作為監聽套接字的工廠可以接受一個包含多個本地地址的集合, IoAcceptor 會自行鍼對這個集合中的每個本地地址分別建立監聽套接字並進行監聽,並且在監聽器銷燬時進行適當的資源清理。這樣便省去了我們建立自行維護多個監聽套接字的麻煩。
監聽套接字由 IoAcceptor 來接管,那麼服務端接受客戶端連線後產生的套接字又由誰接管呢?在 Mina 的術語中,一個 TCP 連線被稱作一個“會話( session )”,對應的 Mina API 是 IoSession 介面。每當服務端接受一個客戶端連線,便會創建出一個新的 IoSession 物件,通過該物件就可以對新建立的 TCP 連線進行各種操作。
在這個示例中,我們選用基於 Java NIO 的監聽器 實現 NioSocketAcceptor 。之後,每當伺服器接受一個客戶端連線, IoAccetpor 都會產生一個代表客戶端和伺服器 TCP 連線的 IoSession 物件。
設定事件回撥
24 acceptor.setHandler( new IoHandlerAdapter() {
25
26 @Override
27 public void messageSent( final IoSession session,
28 final Object message )
29 throws Exception {
30 session.close();
31 }
32
33 @Override
34 public void sessionOpened( final IoSession session )
35 throws Exception {
36 session.write( new Date() );
37 }
38
39 } );
然後,我們要讓這個 IoAcceptor 創建出的 IoSession 物件知道在各種事件發生時應該如何進行處理。這實際上就是文章開頭所描述的反應式模型的應用層介面。只是反應式模型中的事件分支判斷部分被 Mina 封裝了起來,只暴露出包含了各種事件回撥的 IoHandler 介面。可供處理的事件回撥包括:
- sessionCreated(IoSession)
IoSession 物件被建立時的回撥,一般用於進行會話初始化操作。注意,與 sessionOpened(IoSession) 回撥不同, IoSession 物件的建立並不意味著對應的底層 TCP 連線的建立,而僅僅代表它的字面意思:一個 IoSession 物件被創建出來了。
- sessionOpened(IoSession)
IoSession 物件被開啟時的回撥。在 TCP 中,該事件是在 TCP 連線建立時觸發的,一般可用於發起連線建立後的握手、認證等操作。
- sessionClosed(IoSession)
IoSession 物件被關閉時的回撥。在 TCP 中,該事件是在 TCP 連線斷開時觸發的。一般可用於會話資源的清理等操作。
- sessionIdle(IoSession, IdleStatus)
IoSession 物件超時時的回撥。當一個 IoSession 物件在指定的超時時長內沒有讀寫事件發生,就會觸發該事件,一般可用於通知伺服器斷開長時間閒置的連線等處理。具體的超時設定可由 IoService.setWriteIdleTime(int) 、 IoService.setReadIdleTime(int) 和 IoService.setBothIdleTime(int) 設定。
- messageReceived(IoSession, Object)
當接收到 IoSession 對端傳送的資料時的回撥。
- messageSent(IoSession, Object)
當傳送給 IoSession 對端的資料傳送成功時的回撥。
- exceptionCaught(IoSession, Throwable)
當會話過程中出現異常時的回撥。通常用於錯誤處理。
然而,並非每個應用都對所有這些事件感興趣,要實現所有這些方法未免繁瑣,因此 Mina 提供了抽象類 IoHandlerAdapter ,它實現了各個事件的預設處理——也就是不處理。因此,通常我們只需要繼承 IoHandlerAdapter 並覆蓋需要處理的事件回撥就可以了。在 Timer Server 示例中,回想一下我們設計的功能——當有客戶端連線建立時便向客戶端以文字方式傳送當前時間,並關閉連線。為了實現這個功能,我們需要實現兩個事件回撥:首先,在 sessionOpened(IoSession) 事件回撥中向對端傳送當前日期,其次,在 messageSent(IoSession, Object) 事件回撥中關閉連線。以上便是第 24 至 39 行所建立的匿名類完成的事情。
需要注意的是 sessionOpened 方法中的這一行:
36 session.write( new Date() );
這裡的 IoSession.write(Object) 方法便是一個非同步方法。對該方法的呼叫並不會阻塞,而是向 Mina 投遞了一個非同步寫操作,並返回一個可用於對已投遞非同步寫操作進行控制的 WriteFuture 物件。例如,通過呼叫 WriteFuture 的 await 方法或 awaitUninterruptibly() ,我們就可以同步等待該非同步操作的完成。配置過濾器鏈
41 acceptor.getFilterChain().addLast( "codec",
42 new ProtocolCodecFilter( new TextLineCodecFactory() ) );
43 acceptor.getFilterChain().addLast( "logging", new LoggingFilter() );接下來,是對過濾器鏈的配置。過濾器鏈可以被當作一條兩端分別連線伺服器和客戶端的管道,管道中首尾相接地裝上零個或多個過濾器。每個過濾器都可對 通過的資料進行任意的操作,包括增加、刪除、更新、型別轉換等。先裝上的過濾器更靠近遠端端點(客戶端),後裝上的更靠近本地端點(伺服器)。 41 至 43 行先後向 IoAcceptor 的過濾器鏈中添加了兩個過濾器,分別名為“ codec ”和“ logging ”。後者很好理解,其作用就是對 IoSession 物件上發生的各種事件進行日誌記錄。而前者就要多費一些口舌來解釋了。
協議編解碼器
我們知道, TCP 本身只是一個可靠位元組流協議, TCP 層面上 的 二進位制資料流不具備任何的邊界和結構,只是純粹的位元組流。而在應用層面上,我們在不同通訊節點間處理和交換的——也就是應用構建人員直接關心的——是應用 域物件(application domain object)。這就產生了矛盾:應用構建人員需要具有特定型別的域物件來適應具體問題域的需求,而在 TCP 層面,我們手裡只有一股股死板的二進位制資料流。為了解決這種矛盾,為底層的二進位制流賦予個性,向上層應用提供鮮活的域物件,我們就需要將二者互相轉換。於 是就引入了一對自古以來就繁瑣乏味的工作:打包和拆包。
打包,就是將域物件轉換為二進位制資料包,各個資料包首尾相接,形成二進位制資料流;拆包, 就是從無包邊界的二進位制資料流中將資料包一個個拆分出來並轉換成相應的域物件。由於 TCP 沒有包邊界,相對於打包而言,拆包的工作尤其乏味和易錯。為了對這些操作進行適度的封裝以便重用, Mina 提供了一個有力的工具——協議編解碼器( protocol codec )。簡而言之,協議編解碼器的職責,就是打包和拆包。針對一種型別的域物件,我們需要編寫一個編碼器( encoder )和一個解碼器( decoder ),分別用於打包和拆包。將這對編碼器和解碼器通過一個 ProtocolCodecFactory 包裝起來,就組成了一個協議編解碼器。最後,再用一個 ProtocolCodecFilter 將這個協議編解碼器包裝成一個過濾器,就可以將之插入過濾器鏈中,來實現二進位制 TCP 資料流與應用域物件的自動轉換了。
解釋完了原理,我們再回到 Time Server 的示例中來。我們的 Time Server 很簡單,但是麻雀雖小五臟俱全,這裡也同樣存在著打包和拆包的問題。首先我們來確定一下域物件。仔細觀察一下 24 至 39 行中構造的 IoHandlerAdapter 匿名子類,我們就可以發現,在整個客戶端伺服器會話過程中,除了 TCP 建立和斷開過程中的握手訊息以外,唯一的資料 I/O 就是在 sessionOpened(IoSession) 事件回撥中由伺服器向客戶端傳送的 Date 物件。但是,為了利用 Mina 本身提供的 TextLineCodecFactory ,我們並不採用 Date 作為域物件型別,而採用 String ,藉助於 Date.toString() 方法,這個選擇並不會導致什麼問題。 TextLineCodecFactory 提供了一套面向字串文字行的協議編解碼器。它將每個傳入編碼器的字串作為單獨的一行文字打包進 TCP 流,並通過解碼器將 TCP 流中的文字以行尾單位轉換為 String 物件。這樣,就方便地實現了 Time Server 的打包和拆包。
至此,我們已經完成了 Time Server 的大部分編碼工作:我們通過 IoAcceptor 建立了監聽套接字,為後續將要產生的 IoSession 物件設定了相應的事件回撥處理,還配置了過濾器鏈,並在過濾器鏈中嵌入了 TextLineCodecFactory 協議編解碼器。圖 1 描述出了 Time Server 的結構與資料流向:
繫結監聽套接字
圖 1. Apache Mina 2.0.x Time Server
45 try {
46 acceptor.bind( new InetSocketAddress( 8150 ) );
47 }
48 catch( final IOException e ) {
49 log.error( "Bind error: ", e );
50 }好了,萬事俱備只欠東風。最後,我們只需要開啟監聽埠,就萬事大吉了。上文中我們提到過可以為 IoAcceptor 設定多個監聽地址,但這裡我們只需要監聽通配地址 0.0.0.0 上的 8150 埠就可以了,因此直接在 IoAcceptor.bind(SocketAddress) 中指定該監聽地址即可。
IoAcceptor.bind(SocketAddress) 並不僅僅是傳統 BSD Socket API 中的 socket / bind / listen / accept 經典操作序列中的 bind ,而是集四者於一身,以達到簡化程式設計的目的。
Run!
編譯後,一個熱騰騰的 Time Server 就新鮮出爐了!讓我們來跑跑看。首先配置一下日誌策略,將日誌輸出指向標準輸出, log4j.xml 內容如下:
1 <?xml version="1.0" encoding="UTF-8"?>
2 <!DOCTYPE log4j:configuration SYSTEM "log4j.dtd" >
3 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
4 <appender name="stdout" class="org.apache.log4j.ConsoleAppender">
5 <layout class="org.apache.log4j.PatternLayout">
6 <param name="ConversionPattern" value="%p - %c{1} - %m%n" />
7 </layout>
8 </appender>
9 <root>
10 <level value="info" />
11 <appender-ref ref="stdout" />
12 </root>
13 </log4j:configuration>現在執行伺服器,再開啟終端,用 telnet 連線伺服器:
$ telnet localhost 8150
一切正常的話,將獲得類似如下的輸出:
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Tue Jan 15 15:50:38 CST 2008
Connection closed by foreign host.標註出的這行,就是伺服器傳送過來的當前伺服器時間。同時,伺服器端將在標準輸出上有類似如下的日誌輸出:
INFO - LoggingFilter - [/127.0.0.1:3080] CREATED
INFO - LoggingFilter - [/127.0.0.1:3080] OPENED
INFO - LoggingFilter - [/127.0.0.1:3080] SENT: Tue Jan 15 15:50:38 CST 2008 INFO - LoggingFilter - [/127.0.0.1:3080] CLOSED可以看到標註出的與客戶端輸出對應的日誌輸出行。
我們還可以做一個小小的改動來詳細地看一下協議編解碼器的工作過程:將兩個過濾器的新增順序對掉一下。對掉之前,日誌過濾器在協議編解碼過濾器之上(見圖 1 ),因此,在日誌中輸出的是 Date 物件,更具體的說,是 Date.toString() 的結果。對掉之後,日誌過濾器位於協議編解碼過濾器之下,我們便可以看到由編碼器編碼後的日期字串的位元組序列(對應的時間字串是“ Tue Jan 15 16:34:36 CST 2008 ”):
INFO - LoggingFilter - [/192.168.80.180:60144] CREATED
INFO - LoggingFilter - [/192.168.80.180:60144] OPENED
INFO - LoggingFilter - [/192.168.80.180:60144] SENT: HeapBuffer[pos=0 lim=29 cap=32: 54 75 65 20 4A 61 6E 20 31 35 20 31 36 3A 33 34...]
INFO - LoggingFilter - [/192.168.80.180:60144] SENT: HeapBuffer[pos=0 lim=0 cap=0: empty]
INFO - LoggingFilter - [/192.168.80.180:60144] CLOSED小結
區區 53 行程式碼,我們便用 Mina 實現了一個全功能的併發網路時間伺服器。值得注意的是,這並不是一個如同 UNP 中的第一個 Echo Server 示例那樣的以同步方式序列處理客戶端請求的迭代式伺服器,而是一個基於 Java NIO 多路複用機制的高效能非同步併發伺服器。
Time Server 在併發策略上採用的是預設的單執行緒策略。我們可以通過在過濾器鏈中插入一個 ExecutorFilter 來啟用執行緒池來完成 IoHandler 中定義的事件回撥操作。當在事件處理過程中存在檔案 I/O 或資料庫操作等耗時較長的同步阻塞操作時,採用多執行緒的併發策略可以獲取更高的併發度。在 Mina 1.1.x 中,除了 ExecutorFilter 的方式,每個 IoService (各種 IoAcceptor 和以後將要介紹的 IoConnector 都是 IoService 的一種)具備一個 ThreadModel 域,可以使用特定的執行緒模型來制定併發策略。然而這種方式增加了程式設計的複雜度,因此在 2.0.x 中被去除了。
在後續的文章中,還將對 Mina 的客戶端程式設計、 SSL/TLS 程式設計以及 Spring 、 JMX 整合等內容進行介紹。