高階Java工程師必備 ----- 深入分析 Java IO (一)BIO
BIO程式設計
最原始BIO
網路程式設計的基本模型是C/S模型,即兩個程序間的通訊。
服務端提供IP和監聽埠,客戶端通過連線操作想服務端監聽的地址發起連線請求,通過三次握手連線,如果連線成功建立,雙方就可以通過套接字進行通訊。
傳統的同步阻塞模型開發中,ServerSocket負責繫結IP地址,啟動監聽埠;Socket負責發起連線操作。連線成功後,雙方通過輸入和輸出流進行同步阻塞式通訊。
最原始BIO通訊模型圖:
存在的問題:
- 同一時間,伺服器只能接受來自於客戶端A的請求資訊;雖然客戶端A和客戶端B的請求是同時進行的,但客戶端B傳送的請求資訊只能等到伺服器接受完A的請求資料後,才能被接受。(acceptor只有在接受完client1的請求後才能接受client2的請求)
- 由於伺服器一次只能處理一個客戶端請求,當處理完成並返回後(或者異常時),才能進行第二次請求的處理。很顯然,這樣的處理方式在高併發的情況下,是不能採用的。
一請求一執行緒BIO
那有沒有方法改進呢? ,答案是有的。改進後BIO通訊模型圖:
此種BIO通訊模型的服務端,通常由一個獨立的Acceptor執行緒負責監聽客戶端的連線,它接收到客戶端連線請求之後為每個客戶端建立一個新的執行緒進行鏈路處理沒處理完成後,通過輸出流返回應答給客戶端,執行緒銷燬。即典型的一請求一應答通宵模型。
程式碼演示
服務端:
package demo.com.test.io.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import demo.com.test.io.nio.NioSocketServer; public class BioSocketServer { //預設的埠號 private static int DEFAULT_PORT = 8083; public static void main(String[] args) { ServerSocket serverSocket = null; try { System.out.println("監聽來自於"+DEFAULT_PORT+"的埠資訊"); serverSocket = new ServerSocket(DEFAULT_PORT); while(true) { Socket socket = serverSocket.accept(); SocketServerThread socketServerThread = new SocketServerThread(socket); new Thread(socketServerThread).start(); } } catch(Exception e) { } finally { if(serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //這個wait不涉及到具體的實驗邏輯,只是為了保證守護執行緒在啟動所有執行緒後,進入等待狀態 synchronized (NioSocketServer.class) { try { BioSocketServer.class.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class SocketServerThread implements Runnable { private Socket socket; public SocketServerThread (Socket socket) { this.socket = socket; } @Override public void run() { InputStream in = null; OutputStream out = null; try { //下面我們收取資訊 in = socket.getInputStream(); out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; //使用執行緒,同樣無法解決read方法的阻塞問題, //也就是說read方法處同樣會被阻塞,直到作業系統有資料準備好 int realLen = in.read(contextBytes, 0, maxLen); //讀取資訊 String message = new String(contextBytes , 0 , realLen); //下面列印資訊 System.out.println("伺服器收到來自於埠:" + sourcePort + "的資訊:" + message); //下面開始傳送資訊 out.write("回發響應資訊!".getBytes()); } catch(Exception e) { System.out.println(e.getMessage()); } finally { //試圖關閉 try { if(in != null) { in.close(); } if(out != null) { out.close(); } if(this.socket != null) { this.socket.close(); } } catch (IOException e) { System.out.println(e.getMessage()); } } } }
客戶端:
package demo.com.test.io.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.URLDecoder; import java.util.concurrent.CountDownLatch; public class BioSocketClient{ public static void main(String[] args) throws Exception { Integer clientNumber = 20; CountDownLatch countDownLatch = new CountDownLatch(clientNumber); // 分別開始啟動這20個客戶端,併發訪問 for (int index = 0; index < clientNumber; index++, countDownLatch.countDown()) { ClientRequestThread client = new ClientRequestThread(countDownLatch, index); new Thread(client).start(); } // 這個wait不涉及到具體的實驗邏輯,只是為了保證守護執行緒在啟動所有執行緒後,進入等待狀態 synchronized (BioSocketClient.class) { BioSocketClient.class.wait(); } } } /** * 一個ClientRequestThread執行緒模擬一個客戶端請求。 * @author keep_trying */ class ClientRequestThread implements Runnable { private CountDownLatch countDownLatch; /** * 這個執行緒的編號 * @param countDownLatch */ private Integer clientIndex; /** * countDownLatch是java提供的同步計數器。 * 當計數器數值減為0時,所有受其影響而等待的執行緒將會被啟用。這樣保證模擬併發請求的真實性 * @param countDownLatch */ public ClientRequestThread(CountDownLatch countDownLatch , Integer clientIndex) { this.countDownLatch = countDownLatch; this.clientIndex = clientIndex; } @Override public void run() { Socket socket = null; OutputStream clientRequest = null; InputStream clientResponse = null; try { socket = new Socket("localhost",8083); clientRequest = socket.getOutputStream(); clientResponse = socket.getInputStream(); //等待,直到SocketClientDaemon完成所有執行緒的啟動,然後所有執行緒一起傳送請求 this.countDownLatch.await(); //傳送請求資訊 clientRequest.write(("這是第" + this.clientIndex + " 個客戶端的請求。 over").getBytes()); clientRequest.flush(); //在這裡等待,直到伺服器返回資訊 System.out.println("第" + this.clientIndex + "個客戶端的請求傳送完成,等待伺服器返回資訊"); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; int realLen; String message = ""; //程式執行到這裡,會一直等待伺服器返回資訊(注意,前提是in和out都不能close,如果close了就收不到伺服器的反饋了) while((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) { message += new String(contextBytes , 0 , realLen); } //String messageEncode = new String(message , "UTF-8"); message = URLDecoder.decode(message, "UTF-8"); System.out.println("第" + this.clientIndex + "個客戶端接收到來自伺服器的資訊:" + message); } catch (Exception e) { } finally { try { if(clientRequest != null) { clientRequest.close(); } if(clientResponse != null) { clientResponse.close(); } } catch (IOException e) { } } } }
存在的問題:
- 雖然在伺服器端,請求的處理交給了一個獨立執行緒進行,但是作業系統通知accept()的方式還是單個的。也就是,實際上是伺服器接收到資料報文後的“業務處理過程”可以多執行緒,但是資料報文的接受還是需要一個一個的來(acceptor只有在接受完client1的請求後才能接受client2的請求),下文會驗證。
- 在linux系統中,可以建立的執行緒是有限的。我們可以通過cat /proc/sys/kernel/threads-max命令檢視可以建立的最大執行緒數。當然這個值是可以更改的,但是執行緒越多,CPU切換所需的時間也就越長,用來處理真正業務的需求也就越少。
- 另外,如果您的應用程式大量使用長連線的話,執行緒是不會關閉的。這樣系統資源的消耗更容易失控。
偽非同步I/O程式設計
為了改進這種一連線一執行緒的模型,我們可以使用執行緒池來管理這些執行緒,實現1個或多個執行緒處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽非同步I/O模型“。
偽非同步I/O模型圖:
程式碼演示
只給出服務端,客戶端和上面相同
package demo.com.test.io.bio; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import demo.com.test.io.nio.NioSocketServer; public class BioSocketServerThreadPool { //預設的埠號 private static int DEFAULT_PORT = 8083; //執行緒池 懶漢式的單例 private static ExecutorService executorService = Executors.newFixedThreadPool(60); public static void main(String[] args) { ServerSocket serverSocket = null; try { System.out.println("監聽來自於"+DEFAULT_PORT+"的埠資訊"); serverSocket = new ServerSocket(DEFAULT_PORT); while(true) { Socket socket = serverSocket.accept(); //當然業務處理過程可以交給一個執行緒(這裡可以使用執行緒池),並且執行緒的建立是很耗資源的。 //最終改變不了.accept()只能一個一個接受socket的情況,並且被阻塞的情況 SocketServerThreadPool socketServerThreadPool = new SocketServerThreadPool(socket); executorService.execute(socketServerThreadPool); } } catch(Exception e) { } finally { if(serverSocket != null) { try { serverSocket.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } //這個wait不涉及到具體的實驗邏輯,只是為了保證守護執行緒在啟動所有執行緒後,進入等待狀態 synchronized (NioSocketServer.class) { try { BioSocketServerThreadPool.class.wait(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } class SocketServerThreadPool implements Runnable { private Socket socket; public SocketServerThreadPool (Socket socket) { this.socket = socket; } @Override public void run() { InputStream in = null; OutputStream out = null; try { //下面我們收取資訊 in = socket.getInputStream(); out = socket.getOutputStream(); Integer sourcePort = socket.getPort(); int maxLen = 1024; byte[] contextBytes = new byte[maxLen]; //使用執行緒,同樣無法解決read方法的阻塞問題, //也就是說read方法處同樣會被阻塞,直到作業系統有資料準備好 int realLen = in.read(contextBytes, 0, maxLen); //讀取資訊 String message = new String(contextBytes , 0 , realLen); //下面列印資訊 System.out.println("伺服器收到來自於埠:" + sourcePort + "的資訊:" + message); //下面開始傳送資訊 out.write("回發響應資訊!".getBytes()); } catch(Exception e) { System.out.println(e.getMessage()); } finally { //試圖關閉 try { if(in != null) { in.close(); } if(out != null) { out.close(); } if(this.socket != null) { this.socket.close(); } } catch (IOException e) { System.out.println(e.getMessage()); } } } }
伺服器端的執行效果
在 Socket socket = serverSocket.accept(); 處打了斷點,有20個客戶端同時發出請求,可服務端還是一個一個的處理,其它執行緒都處於阻塞狀態
推薦部落格
程式設計師寫程式碼之外,如何再賺一份工資?
阻塞的問題根源
那麼重點的問題並不是“是否使用了多執行緒、或是執行緒池”,而是為什麼accept()、read()方法會被阻塞。API文件中對於 serverSocket.accept() 方法的使用描述:
Listens for a connection to be made to this socket and accepts it. The method blocks until a connection is made.
伺服器執行緒發起一個accept動作,詢問作業系統 是否有新的socket套接字資訊從埠xx傳送過來。
注意,是詢問作業系統。也就是說socket套接字的IO模式支援是基於作業系統的,那麼自然同步IO/非同步IO的支援就是需要作業系統級別的了。如下圖:
如果作業系統沒有發現有套接字從指定的埠xx來,那麼作業系統就會等待。這樣serverSocket.accept()方法就會一直等待。這就是為什麼accept()方法為什麼會阻塞:它內部的實現是使用的作業系統級別的同步IO。
- 阻塞IO 和 非阻塞IO
這兩個概念是程式級別的。主要描述的是程式請求作業系統IO操作後,如果IO資源沒有準備好,那麼程式該如何處理的問題:前者等待;後者繼續執行(並且使用執行緒一直輪詢,直到有IO資源準備好了) - 同步IO 和非同步IO
這兩個概念是作業系統級別的。主要描述的是作業系統在收到程式請求IO操作後,如果IO資源沒有準備好,該如何處理相應程式的問題:前者不響應,直到IO資源準備好以後;後者返回一個標記(好讓程式和自己知道以後的資料往哪裡通知),當IO資源準備好以後,再用事件機制返回給程式。