《Netty 權威指南》—— NIO建立的TimeServer原始碼分析
宣告:本文是《Netty 權威指南》的樣章,感謝博文視點授權併發程式設計網站釋出樣章,
我們將在TimeServer例程中給出完整的NIO建立的時間伺服器原始碼:
public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用預設值 } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); New Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); } }
我們對NIO建立的TimeServer進行下簡單分析,8-15行跟之前的一樣,設定監聽埠。16-17行建立了一個被稱為MultiplexerTimeServer的多路複用類,它是個一個獨立的執行緒,負責輪詢多路複用器Selctor,可以處理多個客戶端的併發接入,現在我們繼續看MultiplexerTimeServer的原始碼:
public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路複用器、繫結監聽埠 * * @param port */ public MultiplexerTimeServer(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } /* * (non-Javadoc) * * @see java.lang.Runnable#run() */ @Override public void run() { while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路複用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重複釋放資源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 處理新接入的請求訊息 if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else ; // 讀到0位元組,忽略 } } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
由於這個類相比於傳統的Socket程式設計稍微複雜一些,在此我們進行詳細分析,我們從如下幾個關鍵步驟講解多路複用處理類:
14-26行為構造方法,在構造方法中進行資源初始化,建立多路複用器Selector、ServerSocketChannel,對Channel和TCP引數進行配置,例如將ServerSocketChannel設定為非同步非阻塞模式,它的backlog設定為1024。系統資源初始化成功後將ServerSocketChannel註冊到Selector,監聽SelectionKey.OP_ACCEPT操作位;如果資源初始化失敗,例如埠被佔用則退出
39-61行線上程的run方法的while迴圈體中迴圈遍歷selector,它的休眠時間為1S,無論是否有讀寫等事件發生,selector每隔1S都被喚醒一次,selector也提供了一個無參的select方法。當有處於就緒狀態的Channel時,selector將返回就緒狀態的Channel的SelectionKey集合,我們通過對就緒狀態的Channel集合進行迭代,就可以進行網路的非同步讀寫操作
76-83行處理新接入的客戶端請求訊息,根據SelectionKey的操作位進行判斷即可獲知網路事件的型別,通過ServerSocketChannel的accept接收客戶端的連線請求並建立SocketChannel例項,完成上述操作後,相當於完成了TCP的三次握手,TCP物理鏈路正式建立。注意,我們需要將新建立的SocketChannel設定為非同步非阻塞,同時也可以對其TCP引數進行設定,例如TCP接收和傳送緩衝區的大小等,作為入門的例子,例程沒有進行額外的引數設定
84-109行用於讀取客戶端的請求訊息,首先建立一個ByteBuffer,由於我們事先無法得知客戶端傳送的碼流大小,作為例程,我們開闢一個1M的緩衝區。然後呼叫SocketChannel的read方法讀取請求碼流,注意,由於我們已經將SocketChannel設定為非同步非阻塞模式,因此它的read是非阻塞的。使用返回值進行判斷,看讀取到的位元組數,返回值有三種可能的結果:
1) 返回值大於0:讀到了位元組,對位元組進行編解碼;
2) 返回值等於0:沒有讀取到位元組,屬於正常場景,忽略;
3) 返回值為-1:鏈路已經關閉,需要關閉SocketChannel,釋放資源。
當讀取到碼流以後,我們進行解碼,首先對readBuffer進行flip操作,它的作用是將緩衝區當前的limit設定為position,position設定為0,用於後續對緩衝區的讀取操作。然後根據緩衝區可讀的位元組個數建立位元組陣列,呼叫ByteBuffer的get操作將緩衝區可讀的位元組陣列拷貝到新建立的位元組陣列中,最後呼叫字串的建構函式建立請求訊息體並列印。如果請求指令是”QUERY TIME ORDER”則把伺服器的當前時間編碼後返回給客戶端,下面我們看看如果非同步傳送應答訊息給客戶端。
111-119行將應答訊息非同步傳送給客戶端,我們看下關鍵程式碼,首先將字串編碼成位元組陣列,根據位元組陣列的容量建立ByteBuffer,呼叫ByteBuffer的put操作將位元組陣列拷貝到緩衝區中,然後對緩衝區進行flip操作,最後呼叫SocketChannel的write方法將緩衝區中的位元組陣列傳送出去。需要指出的是,由於SocketChannel是非同步非阻塞的,它並不保證一次能夠把需要傳送的位元組陣列傳送完,此時會出現“寫半包”問題,我們需要註冊寫操作,不斷輪詢Selector將沒有傳送完的ByteBuffer傳送完畢,可以通過ByteBuffer的hasRemain()方法判斷訊息是否傳送完成。此處僅僅是個簡單的入門級例程,沒有演示如何處理“寫半包”場景,後續的章節會有詳細說明。
使用NIO建立TimeServer伺服器完成之後,我們繼續學習如何建立NIO客戶端。首先還是通過時序圖瞭解關鍵步驟和過程,然後結合程式碼進行詳細分析。