《Netty 權威指南》—— 偽非同步IO程式設計
宣告:本文是《Netty 權威指南》的樣章,感謝博文視點授權併發程式設計網站釋出樣章,
為了解決同步阻塞IO面臨的一個鏈路需要一個執行緒處理的問題,後來有人對它的執行緒模型進行了優化,後端通過一個執行緒池來處理多個客戶端的請求接入,形成客戶端個數M:執行緒池最大執行緒數N的比例關係,其中M可以遠遠大於N,通過執行緒池可以靈活的調配執行緒資源,設定執行緒的最大值,防止由於海量併發接入導致執行緒耗盡。 下面,我們結合連線模型圖和原始碼,對偽非同步IO進行分析,看它是否能夠解決同步阻塞IO面臨的問題。
當有新的客戶端接入的時候,將客戶端的Socket封裝成一個Task(該任務實現java.lang.Runnable介面)投遞到後端的執行緒池中進行處理,JDK的執行緒池維護一個訊息佇列和N個活躍執行緒對訊息佇列中的任務進行處理。由於執行緒池可以設定訊息佇列的大小和最大執行緒數,因此,它的資源佔用是可控的,無論多少個客戶端併發訪問,都不會導致資源的耗盡和宕機。 下面的小節,我們依然採用時間伺服器程式,將其改造成偽非同步IO時間伺服器,然後通過對程式碼進行分析,找出其弊端。
public class TimeServer { /** * @param args * @throws IOException */ public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 採用預設值 } } ServerSocket server = null; try { server = new ServerSocket(port); System.out.println("The time server is start in port : " + port); Socket socket = null; TimeServerHandlerExecutePool singleExecutor = new TimeServerHandlerExecutePool( 50, 10000);// 建立IO任務執行緒池 while (true) { socket = server.accept(); singleExecutor.execute(new TimeServerHandler(socket)); } } finally { if (server != null) { System.out.println("The time server close"); server.close(); server = null; } } } }
偽非同步IO的主函式程式碼發生了變化,我們首先建立一個時間伺服器處理類的執行緒池,當接收到新的客戶端連線的時候,將請求Socket封裝成一個Task,然後呼叫執行緒池的execute方法執行,從而避免了每個請求接入都建立一個新的執行緒。 偽非同步IO的TimeServerHandlerExecutePool:
public class TimeServerHandlerExecutePool { private ExecutorService executor; public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) { executor = new ThreadPoolExecutor(Runtime.getRuntime() .availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize)); } public void execute(java.lang.Runnable task) { executor.execute(task); } }
由於執行緒池和訊息佇列都是有界的,因此,無論客戶端併發連線數多大,它都不會導致執行緒個數過於膨脹或者記憶體溢位,相比於傳統的一連線一執行緒模型,是一種改良。 由於客戶端程式碼並沒有改變,因此,我們直接執行服務端和客戶端,看執行結果: 服務端執行結果:
/** * Reads some number of bytes from the input stream and stores them into * the buffer array <code>b</code>. The number of bytes actually read is * returned as an integer. This method blocks until input data is * available, end of file is detected, or an exception is thrown. * * If the length of <code>b</code> is zero, then no bytes are read and * <code>0</code> is returned; otherwise, there is an attempt to read at * least one byte. If no byte is available because the stream is at the * end of the file, the value <code>-1</code> is returned; otherwise, at * least one byte is read and stored into <code>b</code>. * * The first byte read is stored into element <code>b[0]</code>, the * next one into <code>b[1]</code>, and so on. The number of bytes read is, * at most, equal to the length of <code>b</code>. Let <i>k</i> be the * number of bytes actually read; these bytes will be stored in elements * <code>b[0]</code> through <code>b[</code><i>k</i><code>-1]</code>, * leaving elements <code>b[</code><i>k</i><code>]</code> through * <code>b[b.length-1]</code> unaffected. * * @param b the buffer into which the data is read. * @return the total number of bytes read into the buffer, or * <code>-1</code> if there is no more data because the end of * the stream has been reached. * @exception IOException If the first byte cannot be read for any reason * other than the end of the file, if the input stream has been closed, or * if some other I/O error occurs. * @exception NullPointerException if <code>b</code> is <code>null</code>. */ public int read(byte b[]) throws IOException { return read(b, 0, b.length); }
請注意加粗斜體字部分的API說明,當對Socket的輸入流進行讀取操作的時候,它會一直阻塞下去,直到發生如下三種事件: 1) 有資料可讀 2) 可用資料已經讀取完畢 3) 發生空指標或者IO異常 這意味著當對方傳送請求或者應答訊息比較緩慢、或者網路傳輸較慢時,讀取輸入流一方的通訊執行緒將被長時間阻塞,如果對方60S才能夠將資料傳送完成,讀取一方的IO執行緒也將會被同步阻塞60S,在此期間,其它接入訊息只能在訊息佇列中排隊。 下面我們接著對輸出流進行分析,還是看JDK IO類庫輸出流的API文件,然後結合文件說明進行故障分析。 Java 輸入流OutputStream:
public void write(byte b[]) throws IOException *Writes an array of bytes. This method will block until the bytes are *actually written. Parameters: b - the data to be written Throws: IOException If an I/O error has occurred.
當呼叫OutputStream的write方法寫輸出流的時候,它將會被阻塞直到所有要傳送的位元組全部寫入完畢,或者發生異常。學習過TCP/IP相關知識的都知道,當訊息的接收方處理緩慢的時候,將不能及時的從TCP緩衝區讀取資料,這將會導致傳送方的TCP window size不斷減小,直到為0,雙方處於Keep-Alive狀態,訊息傳送方將不能再向TCP緩衝區寫入訊息,這時如果採用的是同步阻塞IO,write操作將會被無限期阻塞,直到TCP window size大於0或者發生IO異常。
- 服務端處理緩慢,返回應答訊息耗費60S,平時只需要10MS;
- 採用偽非同步IO的執行緒正在讀取故障服務節點的響應,由於讀取輸入流是阻塞的,因此,它將會被同步阻塞60S;
- 假如所有的可用執行緒都被故障伺服器阻塞,那後續所有的IO訊息都將在佇列中排隊;
- 由於執行緒池採用阻塞佇列實現,當佇列積滿之後,後續入佇列的操作將被阻塞;
- 由於前端只有一個Accptor執行緒接收客戶端接入,它被阻塞線上程池的同步阻塞佇列之後,新的客戶端請求訊息將被拒絕,客戶端會發生大量的連線超時;
- 由於幾乎所有的連線都超時,呼叫者會認為系統已經崩潰,無法接收新的請求訊息。