手動搭建I/O網路通訊框架3:NIO程式設計模型,升級改造聊天室
第一章:手動搭建I/O網路通訊框架1:Socket和ServerSocket入門實戰,實現單聊
第二章:手動搭建I/O網路通訊框架2:BIO程式設計模型實現群聊
在第二章中用BIO程式設計模型,簡單的實現了一個聊天室。但是其最大的問題在解釋BIO時就已經說了:ServerSocket接收請求時(accept()方法)、InputStream、OutputStream(輸入輸出流的讀和寫)都是阻塞的。還有一個問題就是執行緒池,執行緒多了,伺服器效能耗不起。執行緒少了,在聊天室這種場景下,讓使用者等待連線肯定不可取。今天要說到的NIO程式設計模型就很好的解決了這幾個問題。有兩個主要的替換地方:
1.用Channel代替Stream。2.使用Selector監控多條Channel,起到類似執行緒池的作用,但是它只需一條執行緒。
既然要用NIO程式設計模型,那就要說說它的三個主要核心:Selector、Channel、Buffer。它們的關係是:一個Selector管理多個Channel,一個Channel可以往Buffer中寫入和讀取資料。Buffer名叫緩衝區,底層其實是一個數組,會提供一些方法往陣列寫入讀取資料。
Buffer:
不太瞭解Buffer的可以看看這個:https://blog.csdn.net/czx2018/article/details/89502699
常用API:
allocate() - 初始化一塊緩衝區
put() - 向緩衝區寫入資料
get() - 向緩衝區讀資料
filp() - 將緩衝區的讀寫模式轉換
clear() - 這個並不是把緩衝區裡的資料清除,而是利用後來寫入的資料來覆蓋原來寫入的資料,以達到類似清除了老的資料的效果
compact() - 從讀資料切換到寫模式,資料不會被清空,會將所有未讀的資料copy到緩衝區頭部,後續寫資料不會覆蓋,而是在這些資料之後寫資料
mark() - 對position做出標記,配合reset使用
reset() - 將position置為標記值
簡單地說:Buffer實質上是個陣列,有兩個關鍵的指標,一個position代表當前資料寫入到哪了、一個limit代表限制。初始化時設定了陣列長度,這limit就是陣列的長度。如:設定intBuffer.allocate(10),最大儲存10個int資料,寫入5五個資料後,需要讀取資料了。用filp()轉換讀寫模式後,limit=position,position=0。也就是說從0開始讀,只能讀到第五個。讀完後這個緩衝區就需要clear()了,實際上並沒有真的去清空資料,而是position和limit兩個指標又回到了初始化的位置,接著又可以寫入資料了,反正陣列下標相同重新寫入資料會覆蓋,就沒必要真的去清空了。
Channel:
Channel(通道)主要用於傳輸資料,然後從Buffer中寫入或讀取。它們兩個結合起來雖然和流有些相似,但主要有以下幾點區別:
1.流是單向的,可以發現Stream的輸入流和輸出流是獨立的,它們只能輸入或輸出。而通道既可以讀也可以寫。
2.通道本身不能存放資料,只能藉助Buffer。
3.Channel支援非同步。
Channel有如下三個常用的類:FileChannel、SocketChannel、ServerSocketChannel。從名字也可以看出區別,第一個是對檔案資料的讀寫,後面兩個則是針對Socket和ServerSocket,這裡我們只是用後面兩個。更詳細的用法可以看:https://www.cnblogs.com/snailclimb/p/9086335.html,下面的程式碼中也會用到,會有詳細的註釋。
Selector
多個Channel可以註冊到Selector,就可以直接通過一個Selector管理多個通道。Channel在不同的時間或者不同的事件下有不同的狀態:
1.客戶端的SocketChannel和伺服器端建立連線,SocketChannel狀態就是Connect。
2.伺服器端的ServerSocketChannel接收了客戶端的請求,ServerSocketChannel狀態就是Accept。
3.當SocketChannel或ServerSocketChannel有資料可讀,那麼它們的狀態就是Read。
4.當可以向Channel中寫資料時,那麼它們的狀態就是Write。
具體的使用見下面程式碼註釋或看https://www.cnblogs.com/snailclimb/p/9086334.html
NIO程式設計模型
NIO程式設計模型工作流程:
1.首先會建立一個Selector,用來監視管理各個不同的Channel,也就是不同的客戶端。相當於取代了原來BIO的執行緒池,但是它只需一個執行緒就可以處理多個Channel,沒有了執行緒上下文切換帶來的消耗,很好的優化了效能。
2.建立一個ServerSocketChannel監聽通訊埠,並註冊到Selector,讓Seletor監視這個通道的Accept狀態,也就是接收客戶端請求的狀態。
3.此時客戶端ClientA請求伺服器,那麼Selector就知道了有客戶端請求進來。這時候我們可以得到客戶端的SocketChannel,併為這個通道註冊Read狀態,也就是Selector會監聽ClientA發來的訊息。
4.一旦接收到ClientA的訊息,就會用其他客戶端的SocketChannel的Write狀態,向它們轉發ClientA的訊息。
上程式碼之前,還是先說說各個類的作用:
相比較BIO的程式碼,NIO的程式碼還少了一個類,那就是伺服器端的工作執行緒類。沒了執行緒池,自然也不需要一個單獨的執行緒去服務客戶端。客戶端還是需要一個單獨的執行緒去等待使用者輸入,因為使用者隨時都可能輸入資訊,這個沒法預見,只能阻塞式的等待。
ChatServer:伺服器端的唯一的類,作用就是通過Selector監聽Read和Accept事件,並針對這些事件的型別,進行不同的處理,如連線、轉發。
ChatClient:客戶端,通過Selector監聽Read和Connect事件。Read事件就是獲取伺服器轉發的訊息然後顯示出來;Connect事件就是和伺服器建立連線,建立成功後就可以傳送訊息。
UserInputHandler:專門等待使用者輸入的執行緒,和BIO沒區別。
ChatServer
public class ChatServer { //設定緩衝區的大小,這裡設定為1024個位元組 private static final int BUFFER = 1024; //Channel都要配合緩衝區進行讀寫,所以這裡建立一個讀緩衝區和一個寫緩衝區 //allocate()靜態方法就是設定快取區大小的方法 private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER); private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER); //為了監聽埠更靈活,再不寫死了,用一個建構函式設定需要監聽的埠號 private int port; public ChatServer(int port) { this.port = port; } private void start() { //建立ServerSocketChannel和Selector並開啟 try (ServerSocketChannel server = ServerSocketChannel.open(); Selector selector = Selector.open()) { //【重點,實現NIO程式設計模型的關鍵】configureBlocking設定ServerSocketChannel為非阻塞式呼叫,Channel預設的是阻塞的呼叫方式 server.configureBlocking(false); //繫結監聽埠,這裡不是給ServerSocketChannel繫結,而是給ServerSocket繫結,socket()就是獲取通道原生的ServerSocket或Socket server.socket().bind(new InetSocketAddress(port)); //把server註冊到Selector並監聽Accept事件 server.register(selector, SelectionKey.OP_ACCEPT); System.out.println("啟動伺服器,監聽埠:" + port); while (true) { //select()會返回此時觸發了多少個Selector監聽的事件 if(selector.select()>0) { //獲取這些已經觸發的事件,selectedKeys()返回的是觸發事件的所有資訊 Set<SelectionKey> selectionKeys = selector.selectedKeys(); //迴圈處理這些事件 for (SelectionKey key : selectionKeys) { handles(key, selector); } //處理完後清空selectedKeys,避免重複處理 selectionKeys.clear(); } } } catch (IOException e) { e.printStackTrace(); } } //處理事件的方法 private void handles(SelectionKey key, Selector selector) throws IOException { //當觸發了Accept事件,也就是有客戶端請求進來 if (key.isAcceptable()) { //獲取ServerSocketChannel ServerSocketChannel server = (ServerSocketChannel) key.channel(); //然後通過accept()方法接收客戶端的請求,這個方法會返回客戶端的SocketChannel,這一步和原生的ServerSocket類似 SocketChannel client = server.accept(); client.configureBlocking(false); //把客戶端的SocketChannel註冊到Selector,並監聽Read事件 client.register(selector, SelectionKey.OP_READ); System.out.println("客戶端[" + client.socket().getPort() + "]上線啦!"); } //當觸發了Read事件,也就是客戶端發來了訊息 if (key.isReadable()) { SocketChannel client = (SocketChannel) key.channel(); //獲取訊息 String msg = receive(client); System.out.println("客戶端[" + client.socket().getPort() + "]:" + msg); //把訊息轉發給其他客戶端 sendMessage(client, msg, selector); //判斷使用者是否退出 if (msg.equals("quit")) { //解除該事件的監聽 key.cancel(); //更新Selector selector.wakeup(); System.out.println("客戶端[" + client.socket().getPort() + "]下線了!"); } } } //編碼方式設定為utf-8,下面字元和字串互轉時用得到 private Charset charset = Charset.forName("UTF-8"); //接收訊息的方法 private String receive(SocketChannel client) throws IOException { //用緩衝區之前先清空一下,避免之前的資訊殘留 read_buffer.clear(); //把通道里的資訊讀取到緩衝區,用while迴圈一直讀取,直到讀完所有訊息。因為沒有明確的類似\n這樣的結尾,所以要一直讀到沒有位元組為止 while (client.read(read_buffer) > 0) ; //把訊息讀取到緩衝區後,需要轉換buffer的讀寫狀態,不明白的看看前面的Buffer的講解 read_buffer.flip(); return String.valueOf(charset.decode(read_buffer)); } //轉發訊息的方法 private void sendMessage(SocketChannel client, String msg, Selector selector) throws IOException { msg = "客戶端[" + client.socket().getPort() + "]:" + msg; //獲取所有客戶端,keys()與前面的selectedKeys不同,這個是獲取所有已經註冊的資訊,而selectedKeys獲取的是觸發了的事件的資訊 for (SelectionKey key : selector.keys()) { //排除伺服器和本客戶端並且保證key是有效的,isValid()會判斷Selector監聽是否正常、對應的通道是保持連線的狀態等 if (!(key.channel() instanceof ServerSocketChannel) && !client.equals(key.channel()) && key.isValid()) { SocketChannel otherClient = (SocketChannel) key.channel(); write_buffer.clear(); write_buffer.put(charset.encode(msg)); write_buffer.flip(); //把訊息寫入到緩衝區後,再把緩衝區的內容寫到客戶端對應的通道中 while (write_buffer.hasRemaining()) { otherClient.write(write_buffer); } } } } public static void main(String[] args) { new ChatServer(8888).start(); } }
ChatClient
public class ChatClient { private static final int BUFFER = 1024; private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER); private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER); //宣告成全域性變數是為了方便下面一些工具方法的呼叫,就不用try with resource了 private SocketChannel client; private Selector selector; private Charset charset = Charset.forName("UTF-8"); private void start() { try { client=SocketChannel.open(); selector=Selector.open(); client.configureBlocking(false); //註冊channel,並監聽SocketChannel的Connect事件 client.register(selector, SelectionKey.OP_CONNECT); //請求伺服器建立連線 client.connect(new InetSocketAddress("127.0.0.1", 8888)); //和伺服器一樣,不停的獲取觸發事件,並做相應的處理 while (true) { selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey key : selectionKeys) { handle(key); } selectionKeys.clear(); } } catch (IOException e) { e.printStackTrace(); }catch (ClosedSelectorException e){ //當用戶輸入quit時,在send()方法中,selector會被關閉,而在上面的無限while迴圈中,可能會使用到已經關閉了的selector。 //所以這裡捕捉一下異常,做正常退出處理就行了。不會對伺服器造成影響 } } private void handle(SelectionKey key) throws IOException { //當觸發connect事件,也就是伺服器和客戶端建立連線 if (key.isConnectable()) { SocketChannel client = (SocketChannel) key.channel(); //finishConnect()返回true,說明和伺服器已經建立連線。如果是false,說明還在連線中,還沒完全連線完成 if(client.finishConnect()){ //新建一個新執行緒去等待使用者輸入 new Thread(new UserInputHandler(this)).start(); } //連線建立完成後,註冊read事件,開始監聽伺服器轉發的訊息 client.register(selector,SelectionKey.OP_READ); } //當觸發read事件,也就是獲取到伺服器的轉發訊息 if(key.isReadable()){ SocketChannel client = (SocketChannel) key.channel(); //獲取訊息 String msg = receive(client); System.out.println(msg); //判斷使用者是否退出 if (msg.equals("quit")) { //解除該事件的監聽 key.cancel(); //更新Selector selector.wakeup(); } } } //獲取訊息 private String receive(SocketChannel client) throws IOException{ read_buffer.clear(); while (client.read(read_buffer)>0); read_buffer.flip(); return String.valueOf(charset.decode(read_buffer)); } //傳送訊息 public void send(String msg) throws IOException{ if(!msg.isEmpty()){ write_buffer.clear(); write_buffer.put(charset.encode(msg)); write_buffer.flip(); while (write_buffer.hasRemaining()){ client.write(write_buffer); } if(msg.equals("quit")){ selector.close(); } } } public static void main(String[] args) { new ChatClient().start(); } }
UserInputHandler
public class UserInputHandler implements Runnable { ChatClient client; public UserInputHandler(ChatClient chatClient) { this.client=chatClient; } @Override public void run() { BufferedReader read=new BufferedReader( new InputStreamReader(System.in) ); while (true){ try { String input=read.readLine(); client.send(input); if(input.equals("quit")) break; } catch (IOException e) { e.printStackTrace(); } } } }
測試執行:之前用的是win10的終端執行的,以後直接用IDEA執行,方便些。不過一個類同時執行多個,以實現多個客戶端的場景,需要先做以下設定
設定完後,就可以同時執行兩個ChatClient了,上圖中得Unnamed就是第二個ChatClient,選中後點擊右邊執行按鈕就行了。效果如下: