nio原理和示例程式碼
阿新 • • 發佈:2018-11-04
我正在為學習大資料打基礎中,為了手擼rpc框架,需要懂得nio的原理,在搞懂nio框架前,我會帶著大家手擼一些比較底層的程式碼,當然今後當我們學會了框架,這些繁瑣的程式碼也就不用寫了,但是學一學底層的程式碼也是有好處的嘛。
java.nio全稱java non-blocking IO(實際上是 new io),是指jdk1.4 及以上版本里提供的新api(New IO) ,為所有的原始型別(boolean型別除外)提供快取支援的資料容器,使用它可以提供非阻塞式的高伸縮性網路。
前面我寫的socket的服務端與客戶端的通訊是執行緒阻塞的,這在實際應用場景中並不竟如人意,我們更多需要的是非同步操作,使用者無感知,當我們在操作主執行緒的時候,一些通訊相關的執行緒不應該阻塞我們的主執行緒。我們需要傳送資料,我們只要將請求傳送出去,這時候具體的傳送細節就應該交由底層的作業系統幫我們完成,我們應該可以操作主執行緒繼續完成其他事情。nio就為我們解決這些事情提供了很好的辦法。
學會nio之前我們需要了解這幾個概念:
Channel:
Channel是一個物件,可以通過它讀取和寫入資料。拿 NIO 與原來的 I/O 做個比較,通道就像是流,而且他們面向緩衝區的。 所有資料都通過 Buffer 物件來處理。您永遠不會將位元組直接寫入通道中,相反,您是將資料寫入包含一個或者多個位元組的緩衝區。同樣,您不會直接從通道中讀取位元組,而是將資料從通道讀入緩衝區,再從緩衝區獲取這個位元組。 通道與流的不同之處在於通道是雙向的。而流只是在一個方向上移動(一個流必須是 InputStream 或者 OutputStream 的子類), 而 通道 可以用於讀、寫或者同時用於讀寫。 因為它們是雙向的,所以通道可以比流更好地反映底層作業系統的真實情況。特別是在 UNIX 模型中,底層作業系統通道是雙向的。 緩衝區:先定義一個TimeServer
package com.wenbing.nio; public class TimeServer { public static void main(String[] args) { int port = 8085; if (args != null && args.length < 0) { port = Integer.valueOf(args[0]); } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); } }
再定義一個MultiplexerTimeServer去實現Runnable介面,每個通訊的操作交由這一個執行緒去完成。
package com.wenbing.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路複用器、繫結監聽埠 * * @param port */ public MultiplexerTimeServer(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); // 非阻塞 servChannel.configureBlocking(false); // 繫結埠 servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } @Override public void run() { while (!stop) { try { selector.select(1000); // 查詢存在的活躍的key Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 迭代所有活躍的key,進行操作 Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); // 拿到某個key後,就將其從迭代器裡除去 it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路複用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要單個關閉 if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 處理新接入的請求訊息 if (key.isAcceptable()) { // Accept the new connection ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // Read the data SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); //將當前時間發回去 String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else ; //讀到0位元組,忽略 } } } private void doWrite(SocketChannel channel, String response) throws IOException{ if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
定義TimeClient
package com.wenbing.nio; public class TimeClient { /** * * @param args */ public static void main(String[] args) { int port = 8085; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { //採用預設值 } } new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start(); } }
定義TimeClientHandle同樣繼承Runnable介面,與上面的MultiplexerTimeServer作用類似。
package com.wenbing.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host, int port) { this.host = host == null ? "127.0.0.1" : host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); System.exit(1); } } } private void doConnect() throws IOException { //如果直接連線成功,則註冊到多路複用器上,傳送請求訊息,讀應答 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWriter(socketChannel); } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWriter(SocketChannel sc) throws IOException { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(1024); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("Send order 2 server succeed."); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 判斷連線是否成功 SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWriter(sc); } else { System.exit(1);//連線失敗,程序退出 } } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); this.stop = true; } else if (readBytes < 0) { //對端鏈路關閉 key.cancel(); sc.close(); } else ;//讀到0位元組,忽略 } } } }
啟動TimeServer和TimeClient的main方法,執行結果如下: TimeServer控制檯列印如下: The time server is start in port : 8085
The time server receive order : QUERY TIME ORDER TimeClient控制檯列印如下: Send order 2 server succeed.
Now is : Sun Nov 04 00:10:56 CST 2018 紙上得來終覺淺,絕知此事要躬行,快去動手自己擼一擼吧。