手動搭建I/O網路通訊框架2:BIO程式設計模型實現群聊
第一章:手動搭建I/O網路通訊框架1:Socket和ServerSocket入門實戰,實現單聊
在第一章中運用Socket和ServerSocket簡單的實現了網路通訊。這一章,利用BIO程式設計模型進行升級改造,實現群聊聊天室。
如圖:當一個客戶端請求進來時,接收器會為這個客戶端分配一個工作執行緒,這個工作執行緒專職處理客戶端的操作。在上一章中,伺服器接收到客戶端請求後就跑去專門服務這個客戶端了,所以當其他請求進來時,是處理不到的。
看到這個圖,很容易就會想到執行緒池,BIO是一個相對簡單的模型,實現它的關鍵之處也在於執行緒池。
在上程式碼之前,先大概說清楚每個類的作用,以免弄混淆。更詳細的說明,都寫在註釋當中。
伺服器端:
ChatServer:這個類的作用就像圖中的Acceptor。它有兩個比較關鍵的全域性變數,一個就是儲存線上使用者資訊的Map,一個就是執行緒池。這個類會監聽埠,接收客戶端的請求,然後為客戶端分配工作執行緒。還會提供一些常用的工具方法給每個工作執行緒呼叫,比如:傳送訊息、新增線上使用者等。
ChatHandler:這個類就是工作執行緒的類。在這個專案中,它的工作很簡單:把接收到的訊息轉發給其他客戶端,當然還有一些小功能,比如新增\移除線上使用者。
客戶端:
相較於伺服器,客戶端的改動較小,主要是把等待使用者輸入資訊這個功能分到其他執行緒做,不然這個功能會一直阻塞主執行緒,導致無法接收其他客戶端的訊息。
ChatClient:客戶端啟動類,也就是主執行緒,會通過Socket和伺服器連線。也提供了兩個工具方法:傳送訊息和接收訊息。
UserInputHandler:專門負責等待使用者輸入資訊的執行緒,一旦有資訊鍵入,就馬上傳送給伺服器。
首先建立兩個包區分一下客戶端和伺服器,client和server
伺服器端ChatServer:
public class ChatServer { private int DEFAULT_PORT = 8888; /** * 建立一個Map儲存線上使用者的資訊。這個map可以統計線上使用者、針對這些使用者可以轉發其他使用者傳送的訊息 * 因為會有多個執行緒操作這個map,所以為了安全起見用ConcurrentHashMap * 在這裡key就是客戶端的埠號,但在實際中肯定不會用埠號區分使用者,如果是web的話一般用session。 * value是IO的Writer,用以儲存客戶端傳送的訊息 */ private Map<Integer, Writer> map=new ConcurrentHashMap<>(); /** * 建立執行緒池,執行緒上限為10個,如果第11個客戶端請求進來,伺服器會接收但是不會去分配執行緒處理它。 * 前10個客戶端的聊天記錄,它看不見。當有一個客戶端下線時,這第11個客戶端就會被分配執行緒,伺服器顯示線上 * 大家可以把10再設定小一點,測試看看 * */ private ExecutorService executorService= Executors.newFixedThreadPool(10); //客戶端連線時往map新增客戶端 public void addClient(Socket socket) throws IOException { if (socket != null) { BufferedWriter writer = new BufferedWriter( new OutputStreamWriter(socket.getOutputStream()) ); map.put(socket.getPort(), writer); System.out.println("Client["+socket.getPort()+"]:Online"); } } //斷開連線時map裡移除客戶端 public void removeClient(Socket socket) throws Exception { if (socket != null) { if (map.containsKey(socket.getPort())) { map.get(socket.getPort()).close(); map.remove(socket.getPort()); } System.out.println("Client[" + socket.getPort() + "]Offline"); } } //轉發客戶端訊息,這個方法就是把訊息傳送給線上的其他的所有客戶端 public void sendMessage(Socket socket, String msg) throws IOException { //遍歷線上客戶端 for (Integer port : map.keySet()) { //傳送給線上的其他客戶端 if (port != socket.getPort()) { Writer writer = map.get(port); writer.write(msg); writer.flush(); } } } //接收客戶端請求,並分配Handler去處理請求 public void start() { try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) { System.out.println("Server Start,The Port is:"+DEFAULT_PORT); while (true){ //等待客戶端連線 Socket socket=serverSocket.accept(); //為客戶端分配一個ChatHandler執行緒 executorService.execute(new ChatHandler(this,socket)); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { ChatServer server=new ChatServer(); server.start(); } }
伺服器端ChatHandler:
public class ChatHandler implements Runnable { private ChatServer server; private Socket socket; //建構函式,ChatServer通過這個分配Handler執行緒 public ChatHandler(ChatServer server, Socket socket) { this.server = server; this.socket = socket; } @Override public void run() { try { //往map裡新增這個客戶端 server.addClient(socket); //讀取這個客戶端傳送的訊息 BufferedReader reader = new BufferedReader( new InputStreamReader(socket.getInputStream()) ); String msg = null; while ((msg = reader.readLine()) != null) { //這樣拼接是為了讓其他客戶端也能看清是誰傳送的訊息 String sendmsg = "Client[" + socket.getPort() + "]:" + msg; //伺服器列印這個訊息 System.out.println(sendmsg); //將收到的訊息轉發給其他線上客戶端 server.sendMessage(socket, sendmsg + "\n"); if (msg.equals("quit")) { break; } } } catch (IOException e) { e.printStackTrace(); } finally { //如果使用者退出或者發生異常,就在map中移除該客戶端 try { server.removeClient(socket); } catch (Exception e) { e.printStackTrace(); } } } }
客戶端ChatClient:
public class ChatClient { private BufferedReader reader; private BufferedWriter writer; private Socket socket; //傳送訊息給伺服器 public void sendToServer(String msg) throws IOException { //傳送之前,判斷socket的輸出流是否關閉 if (!socket.isOutputShutdown()) { //如果沒有關閉就把使用者鍵入的訊息放到writer裡面 writer.write(msg + "\n"); writer.flush(); } } //從伺服器接收訊息 public String receive() throws IOException { String msg = null; //判斷socket的輸入流是否關閉 if (!socket.isInputShutdown()) { //沒有關閉的話就可以通過reader讀取伺服器傳送來的訊息。注意:如果沒有讀取到訊息執行緒會阻塞在這裡 msg = reader.readLine(); } return msg; } public void start() { //和服務建立連線 try { socket = new Socket("127.0.0.1", 8888); reader=new BufferedReader( new InputStreamReader(socket.getInputStream()) ); writer=new BufferedWriter( new OutputStreamWriter(socket.getOutputStream()) ); //新建一個執行緒去監聽使用者輸入的訊息 new Thread(new UserInputHandler(this)).start(); /** * 不停的讀取伺服器轉發的其他客戶端的資訊 * 記錄一下之前踩過的小坑: * 這裡一定要建立一個msg接收資訊,如果直接用receive()方法判斷和輸出receive()的話會造成有的訊息不會顯示 * 因為receive()獲取時,在返回之前是阻塞的,一旦接收到訊息才會返回,也就是while這裡是阻塞的,一旦有訊息就會進入到while裡面 * 這時候如果輸出的是receive(),那麼上次獲取的資訊就會丟失,然後阻塞在System.out.println * */ String msg=null; while ((msg=receive())!=null){ System.out.println(msg); } } catch (IOException e) { e.printStackTrace(); }finally { try { if(writer!=null){ writer.close(); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { new ChatClient().start(); } }
客戶端UserInputHandler:
public class UserInputHandler implements Runnable { private ChatClient client; public UserInputHandler(ChatClient client) { this.client = client; } @Override public void run() { try { //接收使用者輸入的訊息 BufferedReader reader = new BufferedReader( new InputStreamReader(System.in) ); //不停的獲取reader中的System.in,實現了等待使用者輸入的效果 while (true) { String input = reader.readLine(); //向伺服器傳送訊息 client.sendToServer(input); if (input.equals("quit")) break; } } catch (IOException e) { e.printStackTrace(); } } }
執行測試:
通過開啟終端,通過javac編譯。如果大家是在IDEA上編碼的話可能會報編碼錯誤,在javac後面加上-encoding utf-8再接java檔案就好了。
編譯後執行,通過java執行時,又遇到了一個坑。會報找不到主類的錯誤,原來是因為加上兩個包,要在class檔名前面加上包名。比如當前在src目錄,下面有client和server兩個包,要這麼執行:java client.XXXX。可我之前明明在client資料夾下執行的java,也是不行,不知道為什麼。
接著測試:
1.首先在一個終端裡執行ChatServer,開啟伺服器
2.在第二個終端裡開啟ChatClient,暫且叫A,此時伺服器的終端顯示:
3.類似的,在第三個終端裡開啟ChatClient,暫且叫B,此時伺服器顯示:
4.A中輸入hi,除了伺服器會列印hi外,B中也會顯示,圖片中的埠號和前面的不一樣,是因為中間出了點小問題,前三張截圖和後面的不是同時執行的。實際中同一個客戶端會顯示一樣的埠號:
5.當客戶端輸入quit時就會斷開連線,最後,伺服器的顯示為: