1. 程式人生 > 實用技巧 >說說Java網路程式設計

說說Java網路程式設計

網路程式設計的目的在於遠端傳送資料,傳送接收資料就涉及到I/O的操作,這裡因為涉及到比較底層位元組和字元的操作,所以不可以使用java.nio.file.Files 操作檔案。那就先說說I/O吧,I/O流分為位元組流和字元流。位元組即Byte,包含8位二進位制數,一個二進位制數就是1bit,中文名稱叫位。字元即一個字母或者一個漢字。一個字母由一個位元組組成,而漢字根據編碼不同由2個或者3個組成。

Java I/O類的實現原理是裝飾者模式,裝飾者模式動態地將責任附加到物件上。若要擴充套件功能,裝飾者提供了比繼承更有彈性的替代方案。

1.將被裝飾者(Concrete Component)當做類中一個成員變數。

2.利用構造將被裝飾者注入

然後引入I/O模型:

阻塞和非阻塞,描述的是結果的請求

阻塞:在得到結果之前就一直呆在那,啥也不幹,此時執行緒掛起,就如其名,執行緒被阻塞了。

非阻塞:如果沒得到結果就返回,等一會再去請求,直到得到結果為止。

非同步和同步,描述的是結果的發出,當呼叫方的請求進來。

同步:在沒獲取到結果前就不返回給呼叫方,如果呼叫方是阻塞的,那麼呼叫方就會一直等著。如果呼叫方是非阻塞的,呼叫方就會先回去,等一會再來問問得到結果沒。

非同步:呼叫方一來,會直接返回,等執行完實際的邏輯後在通過回撥函式把結果返回給呼叫方。

非同步非阻塞

使用者程序發起read操作之後,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對使用者程序產生任何block。然後,kernel會等待資料準備完成,然後將資料拷貝到使用者記憶體,當這一切都完成之後,kernel會給使用者程序傳送一個signal,告訴它read操作完成了。

事實上就是,使用者提交IO請求,然後直接返回,並且核心自動完成將資料從核心緩衝區複製到使用者緩衝區,完成後再通知使用者。

當然,核心通知我們以後我們還需要執行剩餘的操作,但是我們的程式碼已經繼續往下運行了,所以AIO採用了回撥的機制,為每個socket註冊一個回撥事件或者是回撥處理器,在處理器中完成資料的操作,也就是核心通知到使用者的時候,會自動觸發回撥函式,完成剩餘操作。 這樣的方式就是非同步的網路程式設計。

但是,想要讓作業系統支援這樣的功能並非易事,windows的IOCP可以支援AIO方式,但是Linux的AIO支援並不是很好。(所以Netty後來也取消了對AIO的支援)

IO多路複用

:使用IO多路複用器管理socket,由於每個socket是一個檔案描述符,作業系統可以維護socket和它的連線狀態,一般分為可連線,可讀和可寫等狀態。

每當使用者程式接受到socket請求,將請求託管給多路複用器進行監控,當程式對請求感興趣的事件發生時,多路複用器以某種方式通知或是使用者程式自己輪詢請求,以便獲取就緒的socket,然後只需使用一個執行緒進行輪詢,多個執行緒處理就緒請求即可。

IO多路複用避免了每個socket請求都需要一個執行緒去處理,而是使用事件驅動的方式,讓少數的執行緒去處理多數socket的IO請求。

Linux作業系統對IO多路複用提供了較好的支援,select,poll,epoll是Linux提供的支援IO多路複用的API。一般使用者程式基於這個API去開發自己的IO複用模型。比如NIO的非阻塞模型,就是採用了IO多路複用的方式,是基於epoll實現的。

select方式主要是使用陣列來儲存socket描述符,系統將發生事件的描述符做標記,然後IO複用器在輪詢描述符陣列的時候,就可以知道哪些請求是就緒了的。缺點是陣列的長度只能到1024,並且需要不斷地在核心空間和使用者空間之間拷貝陣列。

poll方式不採用陣列儲存描述符,而是使用獨立的資料結構來描述,並且使用id來表示描述符,能支援更多的請求數量,缺點和select方式有點類似,就是輪詢的效率很低,並且需要拷貝資料。

當然,上述兩種方法適合在請求總數較少,並且活躍請求數較多的情況,這種場景下他們的效能還是不錯的。

epoll,epoll函式會在核心空間開闢一個特殊的資料結構,紅黑樹,樹節點中存放的是一個socket描述符以及使用者程式感興趣的事件型別。同時epoll還會維護一個連結串列。用於儲存已經就緒的socket描述符節點。由Linux核心完成對紅黑樹的維護,當事件到達時,核心將就緒的socket節點加入連結串列中,使用者程式可以直接訪問這個連結串列以便獲取就緒的socket。

有了直接與檔案互動的I/O類,那怎麼樣與網路互動呢?這裡就引入Socket:

socket是作業系統提供的網路程式設計介面,他封裝了對於TCP/IP協議棧的支援,用於程序間的通訊,當有連線接入主機以後,作業系統自動為其分配一個socket套接字,套接字繫結著一個IP與埠號。通過socket介面,可以獲取tcp連線的輸入流和輸出流,並且通過他們進行讀取和寫入此操作。

Java提供了net包用於socket程式設計,同時支援像Inetaddress,URL等工具類,使用socket繫結一個endpoint(ip+埠號),可以用於客戶端的請求處理和傳送,使用serversocket繫結本地ip和埠號,可以用於服務端接收TCP請求。

BIO程式設計模型

所謂BIO,就是Block IO,阻塞式的IO。這個阻塞主要發生在:ServerSocket接收請求時(accept()方法)、InputStream、OutputStream(輸入輸出流的讀和寫)都是阻塞的。這個可以在下面程式碼的除錯中發現,比如在客戶端接收伺服器訊息的輸入流處打上斷點,除非伺服器發來訊息,不然斷點是一直停在這個地方的。也就是說這個執行緒在這時間是被阻塞的。

這裡放一個BIO模型寫的聊天室,對於Socket程式設計基礎請移步:https://www.cnblogs.com/yiwangzhibujian/p/7107785.html

服務端:

 1 /*
 2 1. 功能實現:這個類的作用就像Acceptor。它有兩個比較關鍵的全域性變數,一個就是儲存線上使用者資訊的Map,一個就是執行緒池。
 3 這個類會監聽埠,接收客戶端的請求,然後為客戶端分配工作執行緒。
 4 還會提供一些常用的工具方法給每個工作執行緒呼叫,比如:傳送訊息、新增線上使用者等。
 5 */
 6 
 7 import java.io.*;
 8 import java.net.*;
 9 import java.util.Map;
10 import java.util.concurrent.*;
11 
12 public class ChatServer {
13     private int DEFAULT_PORT = 8888;
14     /**
15      * 建立一個Map儲存線上使用者的資訊。這個map可以統計線上使用者、針對這些使用者可以轉發其他使用者傳送的訊息
16      * 因為會有多個執行緒操作這個map,所以為了安全起見用ConcurrentHashMap
17      * 在這裡key就是客戶端的埠號,但在實際中肯定不會用埠號區分使用者,如果是web的話一般用session。
18      * value是IO的Writer,用以儲存客戶端傳送的訊息
19      */
20     private Map<Integer, Writer> map = new ConcurrentHashMap<>();
21     /**
22      * 建立執行緒池,執行緒上限為10個,如果第11個客戶端請求進來,伺服器會接收但是不會去分配執行緒處理它。
23      * 前10個客戶端的聊天記錄,它看不見。當有一個客戶端下線時,這第11個客戶端就會被分配執行緒,伺服器顯示線上
24      * 大家可以把10再設定小一點,測試看看
25      * */
26     private ExecutorService executorService = Executors.newFixedThreadPool(10);
27     //客戶端連線時往map新增客戶端
28     public void addClient(Socket socket) throws IOException {
29         if (socket != null) {
30             BufferedWriter writer = new BufferedWriter(
31                     new OutputStreamWriter(socket.getOutputStream())
32             );
33             map.put(socket.getPort(), writer);
34             System.out.println("Client["+socket.getPort()+"]:Online");
35         }
36     }
37 
38     //斷開連線時map裡移除客戶端
39     public void removeClient(Socket socket) throws Exception {
40         if (socket != null) {
41             if (map.containsKey(socket.getPort())) {
42                 map.get(socket.getPort()).close();
43                 map.remove(socket.getPort());
44             }
45             System.out.println("Client[" + socket.getPort() + "]Offline");
46         }
47     }
48 
49     //轉發客戶端訊息,這個方法就是把訊息傳送給線上的其他的所有客戶端
50     public void sendMessage(Socket socket, String msg) throws IOException {
51         //遍歷線上客戶端
52         for (Integer port : map.keySet()) {
53             //傳送給線上的其他客戶端
54             if (port != socket.getPort()) {
55                 Writer writer = map.get(port);
56                 writer.write(msg);
57                 writer.flush();
58             }
59         }
60     }
61 
62     //接收客戶端請求,並分配Handler去處理請求
63     public void start() {
64         try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
65             System.out.println("Server Start,The Port is:"+DEFAULT_PORT);
66             while (true){
67                 //等待客戶端連線
68                 Socket socket=serverSocket.accept();
69                 //為客戶端分配一個ChatHandler執行緒
70                 executorService.execute(new ChatHandler(this, socket));
71             }
72         } catch (IOException e) {
73             e.printStackTrace();
74         }
75     }
76 
77     public static void main(String[] args) {
78         ChatServer server=new ChatServer();
79         server.start();
80     }
81 }
View Code

服務端的ChatHandler: