《Netty 權威指南》—— AIO建立的TimeClient原始碼分析
宣告:本文是《Netty 權威指南》的樣章,感謝博文視點授權併發程式設計網站釋出樣章,
非同步非阻塞IO版本的時間伺服器服務端已經介紹完畢,下面我們繼續看客戶端的實現。
首先看下客戶端主函式的實現,AIO時間伺服器客戶端 TimeClient:
public class TimeClient { /** * @param args */ public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用預設值 } } new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start(); } }
第15行我們通過一個獨立的IO執行緒建立非同步時間伺服器客戶端handler,在實際專案中,我們不需要獨立的執行緒建立非同步連線物件,因為底層都是通過JDK的系統回撥實現的,在後面執行時間伺服器程式的時候,我們會抓取執行緒呼叫堆疊給大家展示。繼續看程式碼, AsyncTimeClientHandler的實現類原始碼如下:
public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable { private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler(String host, int port) { this.host = host; this.port = port; try { client = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); client.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { client.close(); } catch (IOException e) { e.printStackTrace(); } } @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { client.write(buffer, buffer, this); } else { ByteBuffer readBuffer = ByteBuffer.allocate(1024); client.read( readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer .remaining()]; buffer.get(bytes); String body; try { body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); latch.countDown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try { client.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } }
由於在AsyncTimeClientHandler中大量使用了內部匿名類,所以程式碼看起來稍微有些複雜,下面我們就對主要程式碼進行詳細解說。
9-17行是構造方法,首先通過AsynchronousSocketChannel的open方法建立一個新的AsynchronousSocketChannel物件。然後跳到第36行,建立CountDownLatch進行等待,防止非同步操作沒有執行完成執行緒就退出。第37行通過connect方法發起非同步操作,它有兩個引數,分別如下:
1) A attachment : AsynchronousSocketChannel的附件,用於回撥通知時作為入參被傳遞,呼叫者可以自定義;
2) CompletionHandler<Void,? super A> handler:非同步操作回撥通知介面,由呼叫者實現。
在本例程中,我們的兩個引數都使用AsyncTimeClientHandler類本身,因為它實現了CompletionHandler介面。
接下來我們看非同步連線成功之後的方法回撥completed方法,程式碼第39行,我們建立請求訊息體,對其進行編碼,然後拷貝到傳送緩衝區writeBuffer中,呼叫AsynchronousSocketChannel的write方法進行非同步寫,與服務端類似,我們可以實現CompletionHandler<Integer, ByteBuffer>介面用於寫操作完成後的回撥,程式碼第45-47行,如果傳送緩衝區中仍有尚未傳送的位元組,我們繼續非同步傳送,如果已經發送完成,則執行非同步讀取操作。
程式碼第64-97行是客戶端非同步讀取時間伺服器服務端應答訊息的處理邏輯,程式碼第49行我們呼叫AsynchronousSocketChannel的read方法非同步讀取服務端的響應訊息,由於read操作是非同步的,所以我們通過內部匿名類實現CompletionHandler<Integer, ByteBuffer>介面,當讀取完成被JDK回撥時,我們構造應答訊息。第56-63行我們從CompletionHandler的ByteBuffer中讀取應答訊息,然後列印結果。
第197-96行,當讀取發生異常時,我們關閉鏈路,同時呼叫CountDownLatch的countDown方法讓AsyncTimeClientHandler執行緒執行完畢,客戶端退出執行。
需要指出的是,正如之前的NIO例程,我們並沒有完整的處理網路的半包讀寫,當對例程進行功能測試的時候沒有問題,但是,如果對程式碼稍加改造,進行壓力或者效能測試,就會發現輸出結果存在問題。
由於半包的讀寫會作為專門的小節在Netty的應用和原始碼分析章節進行詳細講解,在NIO的入門章節我們就不詳細展開介紹,以便讀者能夠將注意力集中在NIO的入門知識上來。
下面的小節我們執行AIO版本的時間伺服器程式,並通過列印執行緒堆疊的方式看下JDK回撥非同步Channel CompletionHandler的呼叫情況。