Java Socket程式設計(非阻塞多執行緒,NIO)
阿新 • • 發佈:2019-02-07
服務端:
伺服器Server類
public class Server implements Runnable { private int port; private volatile boolean stop; private Selector selector; private ServerSocketChannel serverSocketChannel; public Server(int port){ this.port = port; } public void init(){ try { //開啟一個選擇器 selector = Selector.open(); //開啟一個Server-Socket監聽通道 serverSocketChannel = ServerSocketChannel.open(); //設定該通道為非阻塞模式 serverSocketChannel.configureBlocking(false); //繫結埠 serverSocketChannel.socket().bind(new InetSocketAddress(port)); //將通道註冊在選擇器上面,並將準備連線狀態作為通道訂閱時間 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); stop = false; System.out.println("伺服器已經啟動,埠號:" + port); }catch (IOException e){ e.printStackTrace(); } } public void run() { init(); while (!stop){ try { //無論是否有讀寫事件發生,selector每隔1s被喚醒一次 selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); //判斷是否準備好接收新進入的連線 if(selectionKey.isAcceptable()){ ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); //通過ServerSocketChannel的accept()建立SocketChannel例項 //完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立 SocketChannel socketChannel = serverSocketChannel.accept(); //設定為非阻塞 socketChannel.configureBlocking(false); //在選擇器註冊,並訂閱讀事件 socketChannel.register(selector,SelectionKey.OP_READ); } if(selectionKey.isReadable()){ SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); //建立byteBuffer,並開闢一個1M的緩衝區 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //讀取請求碼流,返回讀取到的位元組數 int readBytes = socketChannel.read(byteBuffer); //判斷客戶端是否斷開 if(readBytes < 0){ selectionKey.cancel(); socketChannel.close(); return; } //讀取到位元組,對位元組進行編解碼 if(readBytes>0){ //將緩衝區從寫模式切換到讀模式 byteBuffer.flip(); //根據緩衝區可讀位元組數建立位元組陣列 byte[] bytes = new byte[byteBuffer.remaining()]; //向緩衝區讀資料到位元組陣列 byteBuffer.get(bytes); String expression = new String(bytes,"UTF-8"); System.out.println("伺服器收到訊息:"+expression); } } iterator.remove(); } selectionKeys.clear(); }catch (IOException e){ e.printStackTrace(); } } //selector關閉後會自動釋放裡面管理的資源 if(selector != null){ try { selector.close(); }catch (IOException e){ e.printStackTrace(); } } } }
客戶端:
客戶端Client類
public class Client implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; private String name; public Client(int port,String name){ this("localhost",port,name); } public Client(String host,int port,String name){ this.host = host; this.port = port; this.name = name; } public void init(){ try { //開啟一個選擇器 selector = Selector.open(); //開啟一個Socket監聽通道 socketChannel = SocketChannel.open(); //設定該通道為非阻塞模式 socketChannel.configureBlocking(false); //在非阻塞模式下,該方法在建立連線之前就會返回結果了,後續為了確認連線是否建立成功,可以呼叫finishConnect() socketChannel.connect(new InetSocketAddress(host,port)); //訂閱連線事件 socketChannel.register(selector, SelectionKey.OP_CONNECT); stop = false; }catch (IOException e){ e.printStackTrace(); } } public void run() { init(); int i = 0; while (!stop){ try { //無論是否有讀寫事件發生,selector每隔1s被喚醒一次 selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); //判斷是否連線到伺服器 if(selectionKey.isConnectable()){ //判斷連線是否建立成功 if(socketChannel.finishConnect()){ sendMsg(name+" Connect Success!"); socketChannel.register(selector,SelectionKey.OP_WRITE); } } if(selectionKey.isWritable()){ sendMsg(name+" is saying \"Hello World\"!"+i++); Thread.sleep(1000); } iterator.remove(); } selectionKeys.clear(); }catch (ConnectException e){ System.out.println("連線失敗!"); return; }catch (IOException e){ e.printStackTrace(); }catch (InterruptedException e){ e.printStackTrace(); } } } public void sendMsg(String expression) throws IOException{ byte[] bytes = expression.getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); byteBuffer.put(bytes); //翻轉緩衝區,執行的操作: //1.將limit的位置設為position之後的一個位置 //2.將position的位置重置為0 byteBuffer.flip(); socketChannel.write(byteBuffer); //清空緩衝區 byteBuffer.clear(); } }
測試過程:
測試類TestNio
public class TestNio { public static void main(String[] args) { Thread server = new Thread(new Server(10086)); Thread client1 = new Thread(new Client(10086,"ONE")); Thread client2 = new Thread(new Client(10086,"TWO")); server.start(); client1.start(); client2.start(); } }
測試結果:
伺服器已經啟動,埠號:10086
伺服器收到訊息:TWO Connect Success!
伺服器收到訊息:ONE Connect Success!
伺服器收到訊息:ONE is saying "Hello World"!0
伺服器收到訊息:TWO is saying "Hello World"!0
伺服器收到訊息:ONE is saying "Hello World"!1
伺服器收到訊息:TWO is saying "Hello World"!1
伺服器收到訊息:ONE is saying "Hello World"!2
伺服器收到訊息:TWO is saying "Hello World"!2
伺服器收到訊息:ONE is saying "Hello World"!3
伺服器收到訊息:TWO is saying "Hello World"!3
伺服器收到訊息:ONE is saying "Hello World"!4
伺服器收到訊息:TWO is saying "Hello World"!4
伺服器收到訊息:ONE is saying "Hello World"!5
伺服器收到訊息:TWO is saying "Hello World"!5
伺服器收到訊息:ONE is saying "Hello World"!6
伺服器收到訊息:TWO is saying "Hello World"!6
伺服器收到訊息:ONE is saying "Hello World"!7
伺服器收到訊息:TWO is saying "Hello World"!7
伺服器收到訊息:ONE is saying "Hello World"!8
伺服器收到訊息:TWO is saying "Hello World"!8
伺服器收到訊息:ONE is saying "Hello World"!9
伺服器收到訊息:TWO is saying "Hello World"!9
伺服器收到訊息:ONE is saying "Hello World"!10
伺服器收到訊息:TWO is saying "Hello World"!10