《Netty 權威指南》—— NIO建立的TimeClient原始碼分析
宣告:本文是《Netty 權威指南》的樣章,感謝博文視點授權併發程式設計網站釋出樣章,
我們首先還是看下如何對TimeClient進行改造:
public class TimeClient { /** * @param args */ public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用預設值 } } new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001") .start(); } }
與之前唯一不同的就是我們通過建立TimeClientHandle執行緒來處理非同步連線、讀寫操作,由於TimeClient非常簡單且變更不大,我們重點分析TimeClientHandle,程式碼如下:
public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host, int port) { this.host = host == null ? "127.0.0.1" : host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } // 多路複用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重複釋放資源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 判斷是否連線成功 SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc); } else System.exit(1);// 連線失敗,程序退出 } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); this.stop = true; } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else ; // 讀到0位元組,忽略 } } } private void doConnect() throws IOException { // 如果直接連線成功,則註冊到多路複用器上,傳送請求訊息,讀應答 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else socketChannel.register(selector, SelectionKey.OP_CONNECT); } private void doWrite(SocketChannel sc) throws IOException { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) System.out.println("Send order 2 server succeed."); } }
與服務端類似,我們通過對關鍵步驟的原始碼進行分析和解讀,讓大家深入瞭解如何建立NIO客戶端以及如何使用NIO的API。
8-19行建構函式用於初始化NIO的多路複用器和SocketChannel物件,需要注意的是建立SocketChannel之後,需要將其設定為非同步非阻塞模式。就像在2.3.3章節中所講的,我們可以設定SocketChannel的TCP引數,例如接收和傳送的TCP緩衝區大小
28-33行用於傳送連線請求,作為示例,連線是成功的,所以不需要做重連操作,因此將其放到迴圈之前。下面我們具體看看doConnect的實現,程式碼跳到第116-123行,首先對SocketChannel的connect()操作進行判斷,如果連線成功,則將SocketChannel註冊到多路複用器Selector上,註冊SelectionKey.OP_READ,如果沒有直接連線成功,說明服務端沒有返回TCP握手應答訊息,這並不代表連線失敗,我們需要將SocketChannel註冊到多路複用器Selector上,註冊SelectionKey.OP_CONNECT,當服務端返回TCP syn-ack訊息後,Selector就能夠輪詢到這個SocketChannel處於連線就緒狀態
4-72行在迴圈體中輪詢多路複用器Selector,當有就緒的Channel時,執行第59行的handleInput(key)方法,下面我們就對handleInput方法進行分析。
跳到第68行,我們首先對SelectionKey進行判斷,看它處於什麼狀態。如果是處於連線狀態,說明服務端已經返回ACK應答訊息,我們需要對連線結果進行判斷,呼叫SocketChannel的finishConnect()方法,如果返回值為true,說明客戶端連線成功,如果返回值為false或者直接丟擲IOException,說明連線失敗。在本例程中,返回值為true,說明連線成功。將SocketChannel註冊到多路複用器上,註冊SelectionKey.OP_READ操作位,監聽網路讀操作。然後傳送請求訊息給服務端,下面我們對doWrite(sc)進行分析。程式碼跳到110行,我們構造請求訊息體,然後對其編碼,寫入到傳送緩衝區中,最後呼叫SocketChannel的write方法進行傳送,由於傳送是非同步的,所以會存在“半包寫”問題,此處不再贅述。最後通過hasRemaining()方法對傳送結果進行判斷,如果緩衝區中的訊息全部發送完成,列印”Send order 2 server succeed.
程式碼返回第80行,我們繼續分析下客戶端是如何讀取時間伺服器應答訊息的。如果客戶端接收到了服務端的應答訊息,則SocketChannel是可讀的,由於無法事先判斷應答碼流的大小,我們就預分配1M的接收緩衝區用於讀取應答訊息,呼叫SocketChannel的read()方法進行非同步讀取操作,由於是非同步操作,所以必須對讀取的結果進行判斷,這部分的處理邏輯已經在2.3.3章節詳細介紹過,此處不再贅述。如果讀取到了訊息,則對訊息進行解碼,最後列印結果。執行完成後將stop置為true,執行緒退出迴圈
執行緒退出迴圈後,我們需要對連線資源進行釋放,以實現“優雅退出”。60-66行用於多路複用器的資源釋放,由於多路複用器上可能註冊成千上萬的Channel或者pipe,如果一一對這些資源進行釋放顯然不合適。因此,JDK底層會自動釋放所有跟此多路複用器關聯的資源,JDK的API DOC如下:
到此為止,我們已經將時間伺服器通過NIO完成了改造,並對原始碼進行了分析和解讀,下面分別執行時間伺服器的服務端和客戶端,看執行結果。
服務端執行結果:
客戶端執行結果:
通過原始碼對比分析,我們發現NIO程式設計難度確實比同步阻塞BIO大很多,我們的NIO例程並沒有考慮“半包讀”和“半包寫”,如果加上這些,程式碼將會更加複雜。NIO程式碼既然這麼複雜,為什麼它的應用卻越來越廣泛呢,使用NIO程式設計的優點總結如下:
1) 客戶端發起的連線操作是非同步的,可以通過在多路複用器註冊OP_CONNECT等待後續結果,不需要像之前的客戶端那樣被同步阻塞;
2) SocketChannel的讀寫操作都是非同步的,如果沒有可讀寫的資料它不會同步等待,直接返回,這樣IO通訊執行緒就可以處理其它的鏈路,不需要同步等待這個鏈路可用;
3) 執行緒模型的優化:由於JDK的Selector在Linux等主流作業系統上通過epoll實現,它沒有連線控制代碼數的限制(只受限於作業系統的最大控制代碼數或者對單個程序的控制代碼限制),這意味著一個Selector執行緒可以同時處理成千上萬個客戶端連線,而且效能不會隨著客戶端的增加而線性下降,因此,它非常適合做高效能、高負載的網路伺服器。
JDK1.7升級了NIO類庫,升級後的NIO類庫被稱為NIO2.0,引人注目的是Java正式提供了非同步檔案IO操作,同時提供了與Unix網路程式設計事件驅動IO對應的AIO,下面的2.4章節我們學習下如何利用NIO2.0編寫AIO程式,我們還是以時間伺服器為例進行講解。