JAVA NIO 之 Selector 組件
阿新 • • 發佈:2018-10-24
重要功能 ret clas 技術 allocated 單線程 byte exce 重要
NIO 重要功能就是實現多路復用。Selector是SelectableChannel對象的多路復用器。一些基礎知識:
選擇器(Selector):選擇器類管理著一個被註冊的通道集合的信息和它們的就緒狀態。
可選擇通道(SelectableChannel):這個抽象類提供了實現通道的可選擇性所需要的公共方法。它是所有支持就緒檢查的通道類的
父類。例如:ServerSocketChannel、SocketChannel。可選擇通道可以被註冊到選擇器上。
選擇鍵(SelectionKey):選擇鍵封裝了特定的通道與特定的選擇器的註冊關系。
前面的一篇文章NIO簡介中介紹了傳統io的同步阻塞服務器實現,現在來看看NIO多路復用服務器的實現。NIO 利用單線程輪詢事件機制,定位就緒的Channel,決定執行什麽,
僅僅 select()方法階段是阻塞的。這樣一個選擇器避免了之前的多個客服端時切換線程的問題。下面的一張圖能描述這種場景:
代碼實現:
服務器server:
public class SelectSockets { private static int PORT_NUMBER = 9011; /** * allocateDirect(1024) 此方法創建的buffer無法調用array();直接內存 */ private ByteBuffer buffer = ByteBuffer.allocate(1024); public static void main(String[] argv) throws Exception { new SelectSockets().go(argv); } public void go(String[] argv) throws Exception { System.out.println("Listening on port " + PORT_NUMBER); // 創建ServerSocketChannel ServerSocketChannel serverChannel = ServerSocketChannel.open(); // 獲得ServerSocket ServerSocket serverSocket = serverChannel.socket(); // 創建Selector Selector selector = Selector.open(); // 綁定 serverSocket.bind(new InetSocketAddress(PORT_NUMBER)); // false設置為非阻塞模式 serverChannel.configureBlocking(false); // 註冊通道 ////ServerSocketChannel只能註冊SelectionKey.OP_ACCEPT;register(Selector sel, int ops)的ops參數可以通過serverSocketChannel.validOps()獲取。 serverChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { //選擇器select有三種方式,這種帶時間的表示,沒有連接阻塞10秒後繼續或者有連接進來時繼續 int n = selector.select(10000); if (n == 0) { continue; } //selectedKeys()已選擇的鍵 Iterator it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = (SelectionKey) it.next(); //檢查是否有效 if (!key.isValid()) { continue; } //accept if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); System.out.println ("Incoming connection from: "+ channel.socket().getRemoteSocketAddress( )); registerChannel(selector, channel, SelectionKey.OP_READ); buffer.clear(); buffer.put("你好,我是服務器!\r\n".getBytes()); buffer.flip(); channel.write(buffer); } //if(key.isReadable())等價於if((key.readyOps( ) & SelectionKey.OP_READ) != 0) if (key.isReadable()) { readHandler(key); } it.remove(); } } } /** * 設置感興趣的通道屬性 * @param selector * @param channel * @param ops * @throws Exception */ protected void registerChannel(Selector selector, SelectableChannel channel, int ops) throws Exception { if (channel == null) { return; } channel.configureBlocking(false); channel.register(selector, ops); } /** * 處理讀取數據 * @param key * @throws Exception */ protected void readHandler(SelectionKey key) throws Exception { SocketChannel socketChannel = (SocketChannel) key.channel(); int count; StringBuilder sb = new StringBuilder(); ByteBuffer tmpByteBuffer = ByteBuffer.allocate(1024); //讀取客服端消息 while ((count = socketChannel.read(tmpByteBuffer)) > 0) { tmpByteBuffer.flip(); sb.append(new String(tmpByteBuffer.array())); // 這裏可以回寫給客服端 while (tmpByteBuffer.hasRemaining()) { socketChannel.write(tmpByteBuffer); } tmpByteBuffer.clear(); } System.out.println("客服端"+socketChannel.socket().getRemoteSocketAddress()+"說:"+sb.toString()); if (count < 0) { // Close channel on EOF, invalidates the key socketChannel.close(); } } }
客服端:
/** * @author tangquanbin * @date 2018/10/23 22:23 */ public class Client { private static final int BUFFER_SIZE = 1024; private static int PORT = 9011; private static String[] messages = {"今天讀到一句話,覺得很好:但行好事,莫問前程。"}; public static void main(String[] args) { try { InetAddress inetAddress = InetAddress.getLocalHost(); InetSocketAddress address =new InetSocketAddress(inetAddress, PORT); SocketChannel socketChannel = SocketChannel.open(address); for (String msg: messages) { ByteBuffer myBuffer=ByteBuffer.allocate(BUFFER_SIZE); myBuffer.put(msg.getBytes()); myBuffer.flip(); int bytesWritten = socketChannel.write(myBuffer); logger(String.format("Sending Message...: %s\nbytesWritten...: %d",msg, bytesWritten)); } logger("Closing Client connection..."); socketChannel.close(); } catch (IOException e) { logger(e.getMessage()); e.printStackTrace(); } } public static void logger(String msg) { System.out.println(msg); } }
也可以用telnet命令測試:
telnet 127.0.0.1 9011
JAVA NIO 之 Selector 組件