IO與NIO對於非同步Socket的處理
阿新 • • 發佈:2019-02-18
以下的內容以程式碼為主,簡單的展示了傳統IO流和NIO流對Socket請求的處理。
簡單來說,傳統IO流想要處理多個客戶端的Socket請求,它必須要不斷的建立新的執行緒來專門為連入的Socket請求進行處理,如果連入的Socket請求很多,並且來自不同的IP或者埠就必須要不斷的建立執行緒,對系統資源會造成很大的佔用。
下面就是傳統IO流非同步處理Socket請求的程式碼:
package com.firstdata.IOSocket; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; public class SocketCase { public static final int Port = 4495; public static ServerSocket server = null; public static void main(String[] args) throws IOException { // TODO Auto-generated method stub init(); System.out.println("Finish init!"); while (true) { Socket socket = server.accept(); System.out.println("Client connected!" + socket.getPort()); try { SocketProcess socketProcess = new SocketProcess(socket); System.out.println("Start thread!"); // Thread thread = new Thread(socketCase); Thread thread = new Thread(socketProcess); thread.start(); } catch (Exception e) { e.printStackTrace(); } } } private static void init() throws IOException { server = new ServerSocket(); server.setSoTimeout(0); server.setReuseAddress(true); server.bind(new InetSocketAddress(4495)); } }
除了上面這個主函式和初始化方法,還需要下面這個實現多執行緒的類:
package com.firstdata.IOSocket; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; public class SocketProcess implements Runnable { private static Socket socket = null; public SocketProcess(Socket socket) { SocketProcess.socket = socket; } @Override public void run() { // TODO Auto-generated method stub try { handlerSocket(); } catch (Exception e) { e.printStackTrace(); } } public static void handlerSocket() { InputStream rd = null; OutputStream bw = null; try { rd = getBufferedReader(socket); bw = getBufferedWriter(socket); byte[] ReqBuff = new byte[1000]; System.out.println("Ready receive the request message!"); while (rd.read(ReqBuff) > 0) { System.out.println("start"); System.out.println(new String(ReqBuff)); System.out.println("finish"); } } catch (Exception e) { e.printStackTrace(); try { rd.close(); bw.close(); socket.close(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } private static InputStream getBufferedReader(Socket socket) throws IOException { InputStream in = socket.getInputStream(); return in; } private static OutputStream getBufferedWriter(Socket socket) throws IOException { OutputStream out = socket.getOutputStream(); return out; } }
有上面2個類就能夠構成一個簡單的非同步Socket服務端類
對於NIO而已,NIO會有一個選擇器的出現,有點類似在傳統IO流的處理之後加上一塊分配資料流的工作,這樣能夠使得Socket在不建立執行緒的任務下處理多個來自不同地方的Socket請求,對於NIO原理的介紹,網上有很多很多,下面是個簡單的NIO類:
package com.firstdata.NIOSocket; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class SocketCase { // 通道管理器 private Selector selector; /** * 獲得一個ServerSocket通道,並對該通道做一些初始化的工作 * * @param port 繫結的埠號 * @throws IOException */ public void initServer(int port) throws IOException { // 獲得一個ServerSocket通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); // 設定通道為非阻塞 serverChannel.configureBlocking(false); // 將該通道對應的ServerSocket繫結到port埠 serverChannel.socket().bind(new InetSocketAddress(port)); // 獲得一個通道管理器 this.selector = Selector.open(); // 將通道管理器和該通道繫結,併為該通道註冊SelectionKey.OP_ACCEPT事件,註冊該事件後, // 當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。 serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("OP_ACCEPT"); } /** * 採用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則進行處理 * * @throws IOException */ public void listen() { System.out.println("服務端啟動成功!"); // 輪詢訪問selector while (true) { // 當註冊的事件到達時,方法返回;否則,該方法會一直阻塞 try { selector.select(); // 獲得selector中選中的項的迭代器,選中的項為註冊的事件 Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // 刪除已選的key,以防重複處理 ite.remove(); // 客戶端請求連線事件 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 獲得和客戶端連線的通道 SocketChannel channel = server.accept(); // 設定成非阻塞 channel.configureBlocking(false); // 在這裡可以給客戶端傳送資訊哦 channel.write(ByteBuffer.wrap(new String("向客戶端傳送了一條資訊").getBytes())); // 在和客戶端連線成功之後,為了可以接收到客戶端的資訊,需要給通道設定讀的許可權。 channel.register(this.selector, SelectionKey.OP_READ); // 獲得了可讀的事件 } else if (key.isReadable()) { read(key); } } } catch (Exception e) { e.printStackTrace(); } } } /** * 處理讀取客戶端發來的資訊 的事件 * * @param key * @throws IOException */ public void read(SelectionKey key) throws IOException { try { // 伺服器可讀取訊息:得到事件發生的Socket通道 SocketChannel channel = (SocketChannel) key.channel(); // 建立讀取的緩衝區 ByteBuffer buffer = ByteBuffer.allocate(100); int statusCode = channel.read(buffer); System.out.println("length:" + statusCode); if (statusCode != -1) { byte[] data = buffer.array(); String msg = new String(data).trim(); System.out.println("服務端收到資訊:" + msg); ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes()); channel.write(outBuffer);// 將訊息回送給客戶端 } else { channel.close(); } } catch (Exception e) { } } /** * 啟動服務端測試 * * @throws IOException */ public static void main(String[] args) throws IOException { SocketCase server = new SocketCase(); server.initServer(4495); server.listen(); } }
本人只是程式碼的搬運工以及測試員。