1. 程式人生 > 其它 >03基於NIO的聊天室案例

03基於NIO的聊天室案例

服務端程式碼實現

  1 package server;
  2 
  3 import java.io.Closeable;
  4 import java.io.IOException;
  5 import java.net.InetSocketAddress;
  6 import java.nio.ByteBuffer;
  7 import java.nio.channels.*;
  8 import java.nio.charset.Charset;
  9 import java.util.Set;
 10 
 11 public class ChatServer {
 12
13 private static final int DEFAULT_PORT = 8888; 14 private static final String QUIT = "quit"; 15 private static final int BUFFER = 1024; 16 17 private ServerSocketChannel server; 18 private Selector selector; 19 private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
20 private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER); 21 private Charset charset = Charset.forName("UTF-8"); 22 private int port; 23 24 public ChatServer() { 25 this(DEFAULT_PORT); 26 } 27 28 public ChatServer(int port) { 29 this.port = port;
30 } 31 32 private void start() { 33 try { 34 server = ServerSocketChannel.open(); 35 server.configureBlocking(false); 36 server.socket().bind(new InetSocketAddress(port)); 37 38 selector = Selector.open(); 39 server.register(selector, SelectionKey.OP_ACCEPT); 40 System.out.println("啟動伺服器, 監聽埠:" + port + "..."); 41 42 while (true) { 43 selector.select(); 44 Set<SelectionKey> selectionKeys = selector.selectedKeys(); 45 for (SelectionKey key : selectionKeys) { 46 // 處理被觸發的事件 47 handles(key); 48 } 49 selectionKeys.clear(); 50 } 51 52 } catch (IOException e) { 53 e.printStackTrace(); 54 } finally { 55 close(selector); 56 } 57 58 } 59 60 private void handles(SelectionKey key) throws IOException { 61 // ACCEPT事件 - 和客戶端建立了連線 62 if (key.isAcceptable()) { 63 ServerSocketChannel server = (ServerSocketChannel) key.channel(); 64 SocketChannel client = server.accept(); 65 client.configureBlocking(false); 66 client.register(selector, SelectionKey.OP_READ); 67 System.out.println(getClientName(client) + "已連線"); 68 } 69 // READ事件 - 客戶端傳送了訊息 70 else if (key.isReadable()) { 71 SocketChannel client = (SocketChannel) key.channel(); 72 String fwdMsg = receive(client); 73 if (fwdMsg.isEmpty()) { 74 // 客戶端異常 75 key.cancel(); 76 selector.wakeup(); 77 } else { 78 System.out.println(getClientName(client) + ":" + fwdMsg); 79 forwardMessage(client, fwdMsg); 80 81 // 檢查使用者是否退出 82 if (readyToQuit(fwdMsg)) { 83 key.cancel(); 84 selector.wakeup(); 85 System.out.println(getClientName(client) + "已斷開"); 86 } 87 } 88 89 } 90 } 91 92 private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException { 93 for (SelectionKey key: selector.keys()) { 94 Channel connectedClient = key.channel(); 95 if (connectedClient instanceof ServerSocketChannel) { 96 continue; 97 } 98 99 if (key.isValid() && !client.equals(connectedClient)) { 100 wBuffer.clear(); 101 wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg)); 102 wBuffer.flip(); 103 while (wBuffer.hasRemaining()) { 104 ((SocketChannel)connectedClient).write(wBuffer); 105 } 106 } 107 } 108 } 109 110 private String receive(SocketChannel client) throws IOException { 111 rBuffer.clear(); 112 while(client.read(rBuffer) > 0); 113 rBuffer.flip(); 114 return String.valueOf(charset.decode(rBuffer)); 115 } 116 117 private String getClientName(SocketChannel client) { 118 return "客戶端[" + client.socket().getPort() + "]"; 119 } 120 121 private boolean readyToQuit(String msg) { 122 return QUIT.equals(msg); 123 } 124 125 private void close(Closeable closable) { 126 if (closable != null) { 127 try { 128 closable.close(); 129 } catch (IOException e) { 130 e.printStackTrace(); 131 } 132 } 133 } 134 135 public static void main(String[] args) { 136 ChatServer chatServer = new ChatServer(7777); 137 chatServer.start(); 138 } 139 }

客戶端實現

與服務端互動

  1 package client;
  2 
  3 import java.io.Closeable;
  4 import java.io.IOException;
  5 import java.net.InetSocketAddress;
  6 import java.nio.ByteBuffer;
  7 import java.nio.channels.*;
  8 import java.nio.charset.Charset;
  9 import java.util.Set;
 10 import java.util.concurrent.atomic.AtomicBoolean;
 11 
 12 public class ChatClient {
 13 
 14     private static final String DEFAULT_SERVER_HOST = "127.0.0.1";
 15     private static final int DEFAULT_SERVER_PORT = 8888;
 16     private static final String QUIT = "quit";
 17     private static final int BUFFER = 1024;
 18 
 19     private String host;
 20     private int port;
 21     private SocketChannel client;
 22     private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);
 23     private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER);
 24     private Selector selector;
 25     private Charset charset = Charset.forName("UTF-8");
 26 
 27     public ChatClient() {
 28         this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT);
 29     }
 30 
 31     public ChatClient(String host, int port) {
 32         this.host = host;
 33         this.port = port;
 34     }
 35 
 36     public boolean readyToQuit(String msg) {
 37         return QUIT.equals(msg);
 38     }
 39 
 40     private void close(Closeable closable) {
 41         if (closable != null) {
 42             try {
 43                 closable.close();
 44             } catch (IOException e) {
 45                 e.printStackTrace();
 46             }
 47         }
 48     }
 49 
 50     private void start() {
 51         try {
 52             client = SocketChannel.open();
 53             client.configureBlocking(false);
 54 
 55             selector = Selector.open();
 56             client.register(selector, SelectionKey.OP_CONNECT);
 57             client.connect(new InetSocketAddress(host, port));
 58 
 59             while (true) {
 60                 selector.select();
 61                 Set<SelectionKey> selectionKeys = selector.selectedKeys();
 62                 for (SelectionKey key : selectionKeys) {
 63                     handles(key);
 64                 }
 65                 selectionKeys.clear();
 66             }
 67         } catch (IOException e) {
 68             e.printStackTrace();
 69         } catch (ClosedSelectorException e) {
 70             // 使用者正常退出
 71         } finally {
 72             close(selector);
 73         }
 74 
 75     }
 76 
 77     private void handles(SelectionKey key) throws IOException {
 78         // CONNECT事件 - 連線就緒事件
 79         if (key.isConnectable()) {
 80             SocketChannel client = (SocketChannel) key.channel();
 81             if (client.isConnectionPending()) {
 82                 client.finishConnect();
 83                 // 處理使用者的輸入
 84                 new Thread(new UserInputHandler(this)).start();
 85             }
 86             client.register(selector, SelectionKey.OP_READ);
 87         }
 88         // READ事件 -  伺服器轉發訊息
 89         else if (key.isReadable()) {
 90             SocketChannel client = (SocketChannel) key.channel();
 91             String msg = receive(client);
 92             if (msg.isEmpty()) {
 93                 // 伺服器異常
 94                 close(selector);
 95             } else {
 96                 System.out.println(msg);
 97             }
 98         }
 99     }
100 
101     public void send(String msg) throws IOException {
102         if (msg.isEmpty()) {
103             return;
104         }
105 
106         wBuffer.clear();
107         wBuffer.put(charset.encode(msg));
108         wBuffer.flip();
109         while (wBuffer.hasRemaining()) {
110             client.write(wBuffer);
111         }
112 
113         // 檢查使用者是否準備退出
114         if (readyToQuit(msg)) {
115             close(selector);
116         }
117     }
118 
119     private String receive(SocketChannel client) throws IOException {
120         rBuffer.clear();
121         while (client.read(rBuffer) > 0);
122         rBuffer.flip();
123         return String.valueOf(charset.decode(rBuffer));
124     }
125 
126     public static void main(String[] args) {
127         ChatClient client = new ChatClient("127.0.0.1", 7777);
128         client.start();
129     }
130 }

處理使用者輸入

 1 package client;
 2 
 3 import java.io.BufferedReader;
 4 import java.io.IOException;
 5 import java.io.InputStreamReader;
 6 
 7 public class UserInputHandler implements Runnable {
 8 
 9     private ChatClient chatClient;
10 
11     public UserInputHandler(ChatClient chatClient) {
12         this.chatClient = chatClient;
13     }
14 
15     @Override
16     public void run() {
17         try {
18             // 等待使用者輸入訊息
19             BufferedReader consoleReader =
20                     new BufferedReader(new InputStreamReader(System.in));
21             while (true) {
22                 String input = consoleReader.readLine();
23 
24                 // 向伺服器傳送訊息
25                 chatClient.send(input);
26 
27                 // 檢查使用者是否準備退出
28                 if (chatClient.readyToQuit(input)) {
29                     break;
30                 }
31             }
32         } catch (IOException e) {
33             e.printStackTrace();
34         }
35     }
36 }