《Netty 權威指南》—— AIO 建立的TimeServer原始碼分析
宣告:本文是《Netty 權威指南》的樣章,感謝博文視點授權併發程式設計網站釋出樣章,
NIO2.0引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。非同步通道提供兩種方式獲取獲取操作結果:
- 通過java.util.concurrent.Future類來表示非同步操作的結果;
- 在執行非同步操作的時候傳入一個java.nio.channels.
CompletionHandler介面的實現類作為操作完成的回撥。
NIO2.0的非同步套接字通道是真正的非同步非阻塞IO,它對應Unix網路程式設計中的事件驅動IO(AIO),它不需要通過多路複用器(Selector)對註冊的通道進行輪詢操作即可實現非同步讀寫,簡化了NIO的程式設計模型。
下面還是通過程式碼來熟悉NIO2.0 AIO的相關類庫,我們仍舊以時間伺服器為例程進行講解。
AIO 建立的TimeServer原始碼分析
首先看下時間伺服器的主函式:
public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用預設值 } } AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port); new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start(); } }
我們直接從第16行開始看,首先建立非同步的時間伺服器處理類,然後啟動執行緒將AsyncTimeServerHandler拉起,程式碼如下:
public class AsyncTimeServerHandler implements Runnable { private int port; CountDownLatch latch; AsynchronousServerSocketChannel asynchronousServerSocketChannel; public AsyncTimeServerHandler(int port) { this.port = port; try { asynchronousServerSocketChannel = AsynchronousServerSocketChannel .open(); asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); } } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { latch = new CountDownLatch(1); doAccept(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public void doAccept() { asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler()); }
我們重點對AsyncTimeServerHandler進行分析,首先看8-15行,在構造方法中,我們首先建立一個非同步的服務端通道AsynchronousServerSocketChannel,然後呼叫它的bind方法繫結監聽埠,如果埠合法且沒被佔用,繫結成功,列印啟動成功提示到控制檯。
線上程的run方法中,第26行我們初始化CountDownLatch物件,它的作用是在完成一組正在執行的操作之前,允許當前的執行緒一直阻塞。在本例程中,我們讓執行緒在此阻塞,防止服務端執行完成退出。在實際專案應用中,不需要啟動獨立的執行緒來處理AsynchronousServerSocketChannel,這裡僅僅是個demo演示。
第24行用於接收客戶端的連線,由於是非同步操作,我們可以傳遞一個
CompletionHandler<AsynchronousSocketChannel,? super A>型別的handler例項接收accept操作成功的通知訊息,在本例程中我們通過AcceptCompletionHandler例項作為handler接收通知訊息,下面,我們繼續對AcceptCompletionHandler進行分析:
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> { @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { attachment.asynchronousServerSocketChannel.accept(attachment, this); ByteBuffer buffer = ByteBuffer.allocate(1024); result.read(buffer, buffer, new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); attachment.latch.countDown(); } }
CompletionHandler有兩個方法,分別是:
1) public void completed(AsynchronousSocketChannel result,
AsyncTimeServerHandler attachment);
2) public void failed(Throwable exc, AsyncTimeServerHandler attachment);
下面我們分別對這兩個介面的實現進行分析:首先看completed介面的實現,程式碼7-10行,我們從attachment獲取成員變數AsynchronousServerSocketChannel,然後繼續呼叫它的accept方法。可能讀者在此可能會心存疑惑,既然已經接收客戶端成功了,為什麼還要再次呼叫accept方法呢?原因是這樣的:當我們呼叫AsynchronousServerSocketChannel的accept方法後,如果有新的客戶端連線接入,系統將回調我們傳入的CompletionHandler例項的completed方法,表示新的客戶端已經接入成功,因為一個AsynchronousServerSocketChannel可以接收成千上萬個客戶端,所以我們需要繼續呼叫它的accept方法,接收其它的客戶端連線,最終形成一個迴圈。每當接收一個客戶讀連線成功之後,再非同步接收新的客戶端連線。
鏈路建立成功之後,服務端需要接收客戶端的請求訊息,程式碼第8行我們建立新的ByteBuffer,預分配1M的緩衝區。第8行我們通過呼叫AsynchronousSocketChannel的read方法進行非同步讀操作。下面我們看看非同步read方法的引數:
1) ByteBuffer dst:接收緩衝區,用於從非同步Channel中讀取資料包;
2) A attachment:非同步Channel攜帶的附件,通知回撥的時候作為入參使用;
3) CompletionHandler<Integer,? super A>:接收通知回撥的業務handler,本例程中為ReadCompletionHandler。
下面我們繼續對ReadCompletionHandler進行分析:
public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public ReadCompletionHandler(AsynchronousSocketChannel channel) { if (this.channel == null) this.channel = channel; } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); byte[] body = new byte[attachment.remaining()]; attachment.get(body); try { String req = new String(body, "UTF-8"); System.out.println("The time server receive order : " + req); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(currentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } private void doWrite(String currentTime) { if (currentTime != null && currentTime.trim().length() > 0) { byte[] bytes = (currentTime).getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { // 如果沒有傳送完成,繼續傳送 if (buffer.hasRemaining()) channel.write(buffer, buffer, this); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // ingnore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
首先看構造方法,我們將AsynchronousSocketChannel通過引數傳遞到ReadCompletionHandler中當作成員變數來使用,主要用於讀取半包訊息和傳送應答。本例程不對半包讀寫進行具體解說,對此感興趣的可以關注後續章節對Netty半包處理的專題介紹。我們繼續看程式碼,第12-25行是讀取到訊息後的處理,首先對attachment進行flip操作,為後續從緩衝區讀取資料做準備。根據緩衝區的可讀位元組數建立byte陣列,然後通過new String方法建立請求訊息,對請求訊息進行判斷,如果是”QUERY TIME ORDER”則獲取當前系統伺服器的時間,呼叫doWrite方法傳送給客戶端。下面我們對doWrite方法進行詳細分析。
跳到程式碼第28行,首先對當前時間進行合法性校驗,如果合法,呼叫字串的解碼方法將應答訊息編碼成位元組陣列,然後將它拷貝到傳送緩衝區writeBuffer中,最後呼叫AsynchronousSocketChannel的非同步write方法。正如前面介紹的非同步read方法一樣,它也有三個與read方法相同的引數,在本例程中我們直接實現write方法的非同步回撥介面CompletionHandler,程式碼跳到第24行,對傳送的writeBuffer進行判斷,如果還有剩餘的位元組可寫,說明沒有傳送完成,需要繼續傳送,直到傳送成功。
最後,我們關注下failed方法,它的實現很簡單,就是當發生異常的時候,我們對異常Throwable進行判斷,如果是IO異常,就關閉鏈路,釋放資源,如果是其它異常,按照業務自己的邏輯進行處理。本例程作為簡單demo,沒有對異常進行分類判斷,只要發生了讀寫異常,就關閉鏈路,釋放資源。
非同步非阻塞IO版本的時間伺服器服務端已經介紹完畢,下面我們繼續看客戶端的實現。