從同步阻塞到非同步非阻塞角度看網路程式設計
1. 最簡單的網路通訊——同步阻塞通訊(BIO)
首先來看一個傳統簡單的網路通訊案例,該案例是基於同步阻塞的I/O,服務端程式碼如下
public class Server extends Thread{ private ServerSocket serverSocket; public Server(int port) throws IOException { serverSocket = new ServerSocket(port, 1000); //埠號,以及執行連線可以儲存的最長佇列 serverSocket.setSoTimeout(1000000); } public void run() { while(true) { try { System.out.println("等待遠端連線,埠號為:" + serverSocket.getLocalPort() + "..."); Socket server = serverSocket.accept(); System.out.println("遠端主機地址:" + server.getRemoteSocketAddress()); DataInputStream in = new DataInputStream(server.getInputStream()); Thread.sleep(2000); System.out.println(in.readUTF()); DataOutputStream out = new DataOutputStream(server.getOutputStream()); out.writeUTF("0101, 主機收到:" + server.getLocalSocketAddress() + "\nGoodbye!"); server.close(); }catch(SocketTimeoutException s) { System.out.println("Socket timed out!"); break; }catch(IOException e) { e.printStackTrace(); break; } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String [] args) throws IOException { Thread t = new Server(6666); t.run(); } }
客戶端程式碼如下:
public class Client implements Runnable{ private int id; public Client(int id){ this.id = id; } public static void main(String[] args) throws InterruptedException, IOException { ExecutorService es = Executors.newFixedThreadPool(100); for (int i = 0; i < 100; i++) { es.execute(new Client(i+1)); } es.shutdown(); } @Override public void run() { Socket client = null; try { client = new Socket("127.0.0.1", 6666); OutputStream outToServer = client.getOutputStream(); DataOutputStream out = new DataOutputStream(outToServer); out.writeUTF("Hello, I am the " + id + "-client and I come from " + client.getLocalSocketAddress()); InputStream inFromServer = client.getInputStream(); DataInputStream in = new DataInputStream(inFromServer); System.out.println("client-" + id + " : response : " + in.readUTF()); client.close(); } catch (Exception e) { e.printStackTrace(); } } }
看到當假設100個客戶端同時連線伺服器的時候,單執行緒下服務端對接收的請求只會一個一個去處理,導致很多客戶端請求被阻塞,處於等待情況,這個時候,通常的服務端優化的解決辦法是開啟利用執行緒池開啟多個執行緒去處理。如下:
public class BlockServer implements Runnable{ private Socket server; public BlockServer(Socket server){ this.server = server; } @Override public void run() { DataInputStream in = null; DataOutputStream out = null; try { in = new DataInputStream(server.getInputStream()); System.out.println(server.getInetAddress() + ":" + in.readUTF()); out = new DataOutputStream(server.getOutputStream()); Thread.sleep(2000); out.writeUTF("server receive your message." ); in.close(); out.close(); server.close(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { ExecutorService es = Executors.newFixedThreadPool(100); ServerSocket serverSocket = new ServerSocket(6666, 1000); System.out.println("等待遠端連線,埠號為:" + serverSocket.getLocalPort() + "..."); while (!Thread.currentThread().isInterrupted()){ Socket socket = serverSocket.accept(); es.execute(new BlockServer(socket)); } es.shutdown(); } }
兩種結果的輸出可以看出基於多執行緒的網路通訊效率遠遠高於單執行緒。不過多執行緒通訊有一個很大的缺陷——嚴重依賴執行緒,通常在Linux環境下並沒有執行緒的概念,此時,執行緒的本質就是程序了,此時執行緒的建立銷燬,以及執行緒(上下文)的切換將導致很大的開銷,因此,基於這些原因,導致了執行緒資源不能隨便的使用,當我們面對大量的客戶端連線伺服器的時候,並不能一味的去瘋狂建立執行緒。此時,NIO就可以幫助我們解決此類問題。
2. 多路複用的NIO(New IO)——同步非阻塞
BIO模型中,因為在進行IO操作的時候,程式無法知道資料到底準備好沒有,能否可讀,只能一直乾等著,而且即便我們可以猜到什麼時候資料準備好了,但我們也沒有辦法通過socket.read()或者socket.write()函式去返回,而NIO卻可以通過I/O複用技術把這些連線請求註冊到多路複用器Selector中去,用一個執行緒去監聽和處理多個SocketChannel上的事件。
BufferByte和Channel
在NIO中並不是以流的方式來處理資料的,而是以buffer緩衝區和Channel管道(全雙工)配合使用來處理資料。這裡可以用鐵路交通來類比兩者的關係,假設現在有一批貨物要從北京運到上海且用鐵路運輸,則要有一條從北京到上海的鐵路,以及一列運輸貨物的火車,這裡貨物就是客戶端和服務端的交流的資訊,Channel管道則是從北京到上海的鐵路,而buffer緩衝區則是這列運輸火車。 其中Channel分為四類:
-
FileChannel: 用於檔案IO,支援阻塞模式。可以通過InputStream/OutputStream/RandomAccssFile去獲取該物件。該Channel的用法在後面的檔案傳輸示例程式碼中有展示,
-
DatagramChannel: 用於UDP通訊。
-
SocketChannel: 用於TCP的客戶端通訊。客戶端通過SocketChannel.open()獲得該物件。
-
ServerSocketChannel: 用於TCP的服務端通訊。服務端通過ServerSocketChannel.open()獲得該物件。
服務端ServerSocketChannel可以通過呼叫accept方法返回新建立的SocketChannel物件,通過該物件呼叫wriet/read(ByteBuffer)來將資料寫入通道或從通道中讀取資料。而ByteBuffer的用法,主要涉及到幾個變數:capacity,position,limit和mark,具體含義如下程式碼所示,如果要讀取buffer中的資料必須呼叫flip方法,通過改變position和limit的值,來讀取兩個下標之間資料。如下所示:
public class Test1 { public static void main(String[] args) { // 建立一個緩衝區 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // 看一下初始時4個核心變數的值 //limit 緩衝區裡的資料的總數 System.out.println("初始時-->limit--->"+byteBuffer.limit()); //position 下一個要被讀或寫的元素的位置 System.out.println("初始時-->position--->"+byteBuffer.position()); //capacity 緩衝區能夠容納的資料元素的最大數量。 System.out.println("初始時-->capacity--->"+byteBuffer.capacity()); //mark 一個備忘位置。用於記錄上一次讀寫的位置。 System.out.println("初始時-->mark--->" + byteBuffer.mark()); System.out.println("--------------------------------------"); // 新增一些資料到緩衝區中 String s = "testing....."; byteBuffer.put(s.getBytes()); // 看一下初始時4個核心變數的值 System.out.println("put完之後-->limit--->"+byteBuffer.limit()); System.out.println("put完之後-->position--->"+byteBuffer.position()); System.out.println("put完之後-->capacity--->"+byteBuffer.capacity()); System.out.println("put完之後-->mark--->" + byteBuffer.mark()); //讀資料前要呼叫,可以指示讀資料的操作從position讀到limit之間的資料 byteBuffer.flip(); System.out.println("--------------------------------------"); System.out.println("flip完之後-->limit--->"+byteBuffer.limit()); System.out.println("flip完之後-->position--->"+byteBuffer.position()); System.out.println("flip完之後-->capacity--->"+byteBuffer.capacity()); System.out.println("flip完之後-->mark--->" + byteBuffer.mark()); // 建立一個limit()大小的位元組陣列(因為就只有limit這麼多個數據可讀) byte[] bytes = new byte[byteBuffer.limit()]; // 將讀取的資料裝進我們的位元組陣列中 byteBuffer.get(bytes); // 輸出資料 System.out.println(new String(bytes, 0, bytes.length)); } } /*output 初始時-->limit--->1024 初始時-->position--->0 初始時-->capacity--->1024 初始時-->mark--->java.nio.HeapByteBuffer[pos=0 lim=1024 cap=1024] -------------------------------------- put完之後-->limit--->1024 put完之後-->position--->12 put完之後-->capacity--->1024 put完之後-->mark--->java.nio.HeapByteBuffer[pos=12 lim=1024 cap=1024] -------------------------------------- flip完之後-->limit--->12 flip完之後-->position--->0 flip完之後-->capacity--->1024 flip完之後-->mark--->java.nio.HeapByteBuffer[pos=0 lim=12 cap=1024] testing..... */
一些用NIO模型實現的簡單demo,可以檢視[github地址],有檔案傳輸以及多客戶端廣播的demo。
NIO是Java SE 1.4版,為了提升網路傳輸效能而設計的新版本的IO,注意,這裡的優化主要針對的是網路通訊方面的socket的優化。如下程式可以測試針對本地檔案IO,兩者的異同。
public class FileTransformCompare { //傳統方式 private long transferFile(File source, File dest) throws IOException { long startTime = System.currentTimeMillis(); if(!dest.exists()) dest.createNewFile(); BufferedInputStream bis = new BufferedInputStream(new FileInputStream(source)); BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(dest)); //將資料從源讀到目的檔案 byte[] bytes = new byte[1024]; int len = 0; while ((len = bis.read(bytes))>0){ bos.write(bytes, 0, len); } long endTime = System.currentTimeMillis(); return endTime - startTime; } //NIO方式 private long transferFileFileWithNio(File source, File dest) throws IOException { long startTime = System.currentTimeMillis(); if(!dest.exists()) dest.createNewFile(); RandomAccessFile sourceRAF = new RandomAccessFile(source, "rw"); RandomAccessFile destRAF = new RandomAccessFile(dest, "rw"); FileChannel readChannel = sourceRAF.getChannel(); FileChannel writeChannel = destRAF.getChannel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024*1024); //1M緩衝區 while (readChannel.read(byteBuffer) > 0){ byteBuffer.flip(); writeChannel.write(byteBuffer); byteBuffer.clear(); } writeChannel.close(); readChannel.close(); long endTime = System.currentTimeMillis(); return endTime - startTime; } public static void main(String[] args) throws IOException { FileTransformCompare ftc = new FileTransformCompare(); // File source = new File("F:\\apache-maven-3.6.2-bin.tar.gz"); // File dest1 = new File("G:\\迅雷下載\\apache1.tar.gz"); // File dest2 = new File("G:\\迅雷下載\\apache2.tar.gz"); File source = new File("G:\\迅雷下載\\影視\\戰爭之王.BD1280超清國英雙語中英雙字.mp4"); File dest1 = new File("G:\\迅雷下載\\test1.mp4"); File dest2 = new File("G:\\迅雷下載\\test2.mp4"); long time = ftc.transferFile(source, dest1); System.out.println("普通位元組流時間: " + time); long timeNio = ftc.transferFileFileWithNio(source, dest2); System.out.println("NIO時間: " + timeNio); } } /* 當檔案的大小較小的時候,NIO會比傳統IO好一點,但是檔案較大的時候,則NIO不如傳統IO 下面結果是複製一部2.6G的電影的結果: 普通位元組流時間: 79745 NIO時間: 80160 */
也就是說,通常談到NIO的時候,只會針對網路程式設計來說。
3. AIO 非同步非阻塞I/O
NIO的非阻塞模式採用多路複用器(Selector),用一個執行緒不斷的去輪詢所有的通道,一旦某個通道有資料可讀(或可寫),則表示該通道資料以及準備好(通道可寫),那麼這個通道就會被選擇出來,對它進行讀寫操作,但是要注意的是在執行讀寫操作的執行緒本身就是堵塞的,要等待該對該通道的資料操作完成,執行緒才可以去操作其他通道。
而AIO(Asynchronous IO)則是由作業系統在IO操作完成之後再去通知呼叫者,這就意味著執行程式的執行緒再發起讀寫操作的時候總是立即返回的,這個時候可以去做其他的事情,當底層讀寫操作完成的時候,將由作業系統通過呼叫相應的回撥函式將已經讀到的函式交給程式進行處理(寫入過程一樣)。正因如此,會導致不同的作業系統上的效能表現會不同,在Linux系統中AIO的底層系統實現是epoll函式(NIO的底層實現是select函式或者poll函式——兩者的區別在於能儲存檔案描述符的數量有關,因為**select存放檔案描述符的地方是一個數組,而poll則是用連結串列去儲存**)
AIO主要針對一些非同步的IO操作,作業系統執行完讀寫事件後就會呼叫程式的回撥函式—— java.util.concurrent.Future物件和java.nio.channels.CompletionHandler,而Future是基於CompletionHandler的封裝。因為該過資料的讀寫都是由作業系統負責,則回撥函式只需要負責準備傳送資料或者解析讀取的資料即可。
主要的API如下
1. AsynchronousChannelGroup——非同步通訊組,非同步通道在處理 I/O請求時,需要使用一個AsynchronousChannelGroup類,該類的物件表示的是一個非同步通道的分組,每一個分組都有一個執行緒池與之對應,需要使用AsynchronousChannelGroup類的靜態工廠方法withThreadPool(ExectorService es); withFixedThreadPool();withCachedThreadPool()設定執行緒池。
AsynchronousServerSocketChannel
: 非同步版的ServerSocketChannel
,其accpet方法有兩種:
//第一種 AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(null); Future<AsynchronousSocketChannel> future = server.accept(); future.isDone(); //返回物件來查詢操作的狀態 future.isCancelled(); //明確檢查操作是否被取消,如果操作在正常完成之前被取消,則它返回true future.cancel(true); //取消操作 AsynchronousSocketChannel client= future.get(); //使用get()方法,該方法將阻塞等待結果的返回: AsynchronousSocketChannel worker = future.get(10, TimeUnit.SECONDS); //也可以設定阻塞時間 //第二種 AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(null); listener.accept( attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() { public void completed( AsynchronousSocketChannel client, Object attachment) { // do whatever with client } public void failed(Throwable exc, Object attachment) { // handle failure } });
2.AsynchronousSocketChannel
非同步版的SocketChannel,提供了兩種的read()和write()方法。
-
-
void read(ByteBuffer buffer, A attachment, CompletionHandler handler);
-
void write(ByteBuffer buffer, A attachment, CompletionHandler handler);
-
Future<Integer> read(ByteBuffer buffer);
-
Future<Integer> write(ByteBuffer buffer);
-
3. CompletionHandler的回撥介面,當IO操作完成的時候,即會呼叫這兩個方法:
-
void complete(V result, A attachment)
當IO操作順利完成的時候被呼叫,對於accept方法返回Socket通道,對於read/write操作,則返回本次寫入或讀取的位元組數。
-
void failed(Throwable exe, A attachment)
當IO操作失敗的時候被呼叫,建議在此方法中對連線等資源進行關閉和釋放。
關於AIO的demo可以參照github地址上的程式碼,實現一個前臺輸入表示式,後端計算後返回結果的功能。
參考文獻
-
如何學習Java的NIO?
-
Java NIO淺析