03基於NIO的聊天室案例
阿新 • • 發佈:2021-06-30
服務端程式碼實現
1 package server; 2 3 import java.io.Closeable; 4 import java.io.IOException; 5 import java.net.InetSocketAddress; 6 import java.nio.ByteBuffer; 7 import java.nio.channels.*; 8 import java.nio.charset.Charset; 9 import java.util.Set; 10 11 public class ChatServer { 1213 private static final int DEFAULT_PORT = 8888; 14 private static final String QUIT = "quit"; 15 private static final int BUFFER = 1024; 16 17 private ServerSocketChannel server; 18 private Selector selector; 19 private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER);20 private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER); 21 private Charset charset = Charset.forName("UTF-8"); 22 private int port; 23 24 public ChatServer() { 25 this(DEFAULT_PORT); 26 } 27 28 public ChatServer(int port) { 29 this.port = port;30 } 31 32 private void start() { 33 try { 34 server = ServerSocketChannel.open(); 35 server.configureBlocking(false); 36 server.socket().bind(new InetSocketAddress(port)); 37 38 selector = Selector.open(); 39 server.register(selector, SelectionKey.OP_ACCEPT); 40 System.out.println("啟動伺服器, 監聽埠:" + port + "..."); 41 42 while (true) { 43 selector.select(); 44 Set<SelectionKey> selectionKeys = selector.selectedKeys(); 45 for (SelectionKey key : selectionKeys) { 46 // 處理被觸發的事件 47 handles(key); 48 } 49 selectionKeys.clear(); 50 } 51 52 } catch (IOException e) { 53 e.printStackTrace(); 54 } finally { 55 close(selector); 56 } 57 58 } 59 60 private void handles(SelectionKey key) throws IOException { 61 // ACCEPT事件 - 和客戶端建立了連線 62 if (key.isAcceptable()) { 63 ServerSocketChannel server = (ServerSocketChannel) key.channel(); 64 SocketChannel client = server.accept(); 65 client.configureBlocking(false); 66 client.register(selector, SelectionKey.OP_READ); 67 System.out.println(getClientName(client) + "已連線"); 68 } 69 // READ事件 - 客戶端傳送了訊息 70 else if (key.isReadable()) { 71 SocketChannel client = (SocketChannel) key.channel(); 72 String fwdMsg = receive(client); 73 if (fwdMsg.isEmpty()) { 74 // 客戶端異常 75 key.cancel(); 76 selector.wakeup(); 77 } else { 78 System.out.println(getClientName(client) + ":" + fwdMsg); 79 forwardMessage(client, fwdMsg); 80 81 // 檢查使用者是否退出 82 if (readyToQuit(fwdMsg)) { 83 key.cancel(); 84 selector.wakeup(); 85 System.out.println(getClientName(client) + "已斷開"); 86 } 87 } 88 89 } 90 } 91 92 private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException { 93 for (SelectionKey key: selector.keys()) { 94 Channel connectedClient = key.channel(); 95 if (connectedClient instanceof ServerSocketChannel) { 96 continue; 97 } 98 99 if (key.isValid() && !client.equals(connectedClient)) { 100 wBuffer.clear(); 101 wBuffer.put(charset.encode(getClientName(client) + ":" + fwdMsg)); 102 wBuffer.flip(); 103 while (wBuffer.hasRemaining()) { 104 ((SocketChannel)connectedClient).write(wBuffer); 105 } 106 } 107 } 108 } 109 110 private String receive(SocketChannel client) throws IOException { 111 rBuffer.clear(); 112 while(client.read(rBuffer) > 0); 113 rBuffer.flip(); 114 return String.valueOf(charset.decode(rBuffer)); 115 } 116 117 private String getClientName(SocketChannel client) { 118 return "客戶端[" + client.socket().getPort() + "]"; 119 } 120 121 private boolean readyToQuit(String msg) { 122 return QUIT.equals(msg); 123 } 124 125 private void close(Closeable closable) { 126 if (closable != null) { 127 try { 128 closable.close(); 129 } catch (IOException e) { 130 e.printStackTrace(); 131 } 132 } 133 } 134 135 public static void main(String[] args) { 136 ChatServer chatServer = new ChatServer(7777); 137 chatServer.start(); 138 } 139 }
客戶端實現
與服務端互動
1 package client; 2 3 import java.io.Closeable; 4 import java.io.IOException; 5 import java.net.InetSocketAddress; 6 import java.nio.ByteBuffer; 7 import java.nio.channels.*; 8 import java.nio.charset.Charset; 9 import java.util.Set; 10 import java.util.concurrent.atomic.AtomicBoolean; 11 12 public class ChatClient { 13 14 private static final String DEFAULT_SERVER_HOST = "127.0.0.1"; 15 private static final int DEFAULT_SERVER_PORT = 8888; 16 private static final String QUIT = "quit"; 17 private static final int BUFFER = 1024; 18 19 private String host; 20 private int port; 21 private SocketChannel client; 22 private ByteBuffer rBuffer = ByteBuffer.allocate(BUFFER); 23 private ByteBuffer wBuffer = ByteBuffer.allocate(BUFFER); 24 private Selector selector; 25 private Charset charset = Charset.forName("UTF-8"); 26 27 public ChatClient() { 28 this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT); 29 } 30 31 public ChatClient(String host, int port) { 32 this.host = host; 33 this.port = port; 34 } 35 36 public boolean readyToQuit(String msg) { 37 return QUIT.equals(msg); 38 } 39 40 private void close(Closeable closable) { 41 if (closable != null) { 42 try { 43 closable.close(); 44 } catch (IOException e) { 45 e.printStackTrace(); 46 } 47 } 48 } 49 50 private void start() { 51 try { 52 client = SocketChannel.open(); 53 client.configureBlocking(false); 54 55 selector = Selector.open(); 56 client.register(selector, SelectionKey.OP_CONNECT); 57 client.connect(new InetSocketAddress(host, port)); 58 59 while (true) { 60 selector.select(); 61 Set<SelectionKey> selectionKeys = selector.selectedKeys(); 62 for (SelectionKey key : selectionKeys) { 63 handles(key); 64 } 65 selectionKeys.clear(); 66 } 67 } catch (IOException e) { 68 e.printStackTrace(); 69 } catch (ClosedSelectorException e) { 70 // 使用者正常退出 71 } finally { 72 close(selector); 73 } 74 75 } 76 77 private void handles(SelectionKey key) throws IOException { 78 // CONNECT事件 - 連線就緒事件 79 if (key.isConnectable()) { 80 SocketChannel client = (SocketChannel) key.channel(); 81 if (client.isConnectionPending()) { 82 client.finishConnect(); 83 // 處理使用者的輸入 84 new Thread(new UserInputHandler(this)).start(); 85 } 86 client.register(selector, SelectionKey.OP_READ); 87 } 88 // READ事件 - 伺服器轉發訊息 89 else if (key.isReadable()) { 90 SocketChannel client = (SocketChannel) key.channel(); 91 String msg = receive(client); 92 if (msg.isEmpty()) { 93 // 伺服器異常 94 close(selector); 95 } else { 96 System.out.println(msg); 97 } 98 } 99 } 100 101 public void send(String msg) throws IOException { 102 if (msg.isEmpty()) { 103 return; 104 } 105 106 wBuffer.clear(); 107 wBuffer.put(charset.encode(msg)); 108 wBuffer.flip(); 109 while (wBuffer.hasRemaining()) { 110 client.write(wBuffer); 111 } 112 113 // 檢查使用者是否準備退出 114 if (readyToQuit(msg)) { 115 close(selector); 116 } 117 } 118 119 private String receive(SocketChannel client) throws IOException { 120 rBuffer.clear(); 121 while (client.read(rBuffer) > 0); 122 rBuffer.flip(); 123 return String.valueOf(charset.decode(rBuffer)); 124 } 125 126 public static void main(String[] args) { 127 ChatClient client = new ChatClient("127.0.0.1", 7777); 128 client.start(); 129 } 130 }
處理使用者輸入
1 package client; 2 3 import java.io.BufferedReader; 4 import java.io.IOException; 5 import java.io.InputStreamReader; 6 7 public class UserInputHandler implements Runnable { 8 9 private ChatClient chatClient; 10 11 public UserInputHandler(ChatClient chatClient) { 12 this.chatClient = chatClient; 13 } 14 15 @Override 16 public void run() { 17 try { 18 // 等待使用者輸入訊息 19 BufferedReader consoleReader = 20 new BufferedReader(new InputStreamReader(System.in)); 21 while (true) { 22 String input = consoleReader.readLine(); 23 24 // 向伺服器傳送訊息 25 chatClient.send(input); 26 27 // 檢查使用者是否準備退出 28 if (chatClient.readyToQuit(input)) { 29 break; 30 } 31 } 32 } catch (IOException e) { 33 e.printStackTrace(); 34 } 35 } 36 }