1. 程式人生 > >BIO/NIO/AIO的具體實現

BIO/NIO/AIO的具體實現

1、BIO程式設計

    1.1、傳統的BIO程式設計

    網路程式設計的基本模型是C/S模型,即兩個程序間的通訊。

    服務端提供IP和監聽埠,客戶端通過連線操作想服務端監聽的地址發起連線請求,通過三次握手連線,如果連線成功建立,雙方就可以通過套接字進行通訊。

    傳統的同步阻塞模型開發中,ServerSocket負責繫結IP地址,啟動監聽埠;Socket負責發起連線操作。連線成功後,雙方通過輸入和輸出流進行同步阻塞式通訊。 

    簡單的描述一下BIO的服務端通訊模型:採用BIO通訊模型的服務端,通常由一個獨立的Acceptor執行緒負責監聽客戶端的連線,它接收到客戶端連線請求之後為每個客戶端建立一個新的執行緒進行鏈路處理沒處理完成後,通過輸出流返回應答給客戶端,執行緒銷燬。即典型的一請求一應答通宵模型。

    傳統BIO通訊模型圖:

          這裡寫圖片描述

     01

該模型最大的問題就是缺乏彈性伸縮能力,當客戶端併發訪問量增加後,服務端的執行緒個數和客戶端併發訪問數呈1:1的正比關係,Java中的執行緒也是比較寶貴的系統資源,執行緒數量快速膨脹後,系統的效能將急劇下降,隨著訪問量的繼續增大,系統最終就死-掉-了。

同步阻塞式I/O建立的Server原始碼:

  1. package com.anxpp.io.calculator.bio;

  2. import java.io.IOException;

  3. import java.net.ServerSocket;

  4. import java.net.Socket;

  5. /**

  6. * BIO服務端原始碼

  7. * @author yangtao__anxpp.com

  8. * @version 1.0

  9. */

  10. public final class ServerNormal {

  11. //預設的埠號

  12. private static int DEFAULT_PORT = 12345;

  13. //單例的ServerSocket

  14. private static ServerSocket server;

  15. //根據傳入引數設定監聽埠,如果沒有引數呼叫以下方法並使用預設值

  16. public static void start() throws IOException{

  17. //使用預設值

  18. start(DEFAULT_PORT);

  19. }

  20. //這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了

  21. public synchronized static void start(int port) throws IOException{

  22. if(server != null) return;

  23. try{

  24. //通過建構函式建立ServerSocket

  25. //如果埠合法且空閒,服務端就監聽成功

  26. server = new ServerSocket(port);

  27. System.out.println("伺服器已啟動,埠號:" + port);

  28. //通過無線迴圈監聽客戶端連線

  29. //如果沒有客戶端接入,將阻塞在accept操作上。

  30. while(true){

  31. Socket socket = server.accept();

  32. //當有新的客戶端接入時,會執行下面的程式碼

  33. //然後建立一個新的執行緒處理這條Socket鏈路

  34. new Thread(new ServerHandler(socket)).start();

  35. }

  36. }finally{

  37. //一些必要的清理工作

  38. if(server != null){

  39. System.out.println("伺服器已關閉。");

  40. server.close();

  41. server = null;

  42. }

  43. }

  44. }

  45. }

客戶端訊息處理執行緒ServerHandler原始碼:

  1. package com.anxpp.io.calculator.bio;

  2. import java.io.BufferedReader;

  3. import java.io.IOException;

  4. import java.io.InputStreamReader;

  5. import java.io.PrintWriter;

  6. import java.net.Socket;

  7. import com.anxpp.io.utils.Calculator;

  8. /**

  9. * 客戶端執行緒

  10. * @author yangtao__anxpp.com

  11. * 用於處理一個客戶端的Socket鏈路

  12. */

  13. public class ServerHandler implements Runnable{

  14. private Socket socket;

  15. public ServerHandler(Socket socket) {

  16. this.socket = socket;

  17. }

  18. @Override

  19. public void run() {

  20. BufferedReader in = null;

  21. PrintWriter out = null;

  22. try{

  23. in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

  24. out = new PrintWriter(socket.getOutputStream(),true);

  25. String expression;

  26. String result;

  27. while(true){

  28. //通過BufferedReader讀取一行

  29. //如果已經讀到輸入流尾部,返回null,退出迴圈

  30. //如果得到非空值,就嘗試計算結果並返回

  31. if((expression = in.readLine())==null) break;

  32. System.out.println("伺服器收到訊息:" + expression);

  33. try{

  34. result = Calculator.cal(expression).toString();

  35. }catch(Exception e){

  36. result = "計算錯誤:" + e.getMessage();

  37. }

  38. out.println(result);

  39. }

  40. }catch(Exception e){

  41. e.printStackTrace();

  42. }finally{

  43. //一些必要的清理工作

  44. if(in != null){

  45. try {

  46. in.close();

  47. } catch (IOException e) {

  48. e.printStackTrace();

  49. }

  50. in = null;

  51. }

  52. if(out != null){

  53. out.close();

  54. out = null;

  55. }

  56. if(socket != null){

  57. try {

  58. socket.close();

  59. } catch (IOException e) {

  60. e.printStackTrace();

  61. }

  62. socket = null;

  63. }

  64. }

  65. }

  66. }

同步阻塞式I/O建立的Client原始碼:

  1. package com.anxpp.io.calculator.bio;

  2. import java.io.BufferedReader;

  3. import java.io.IOException;

  4. import java.io.InputStreamReader;

  5. import java.io.PrintWriter;

  6. import java.net.Socket;

  7. /**

  8. * 阻塞式I/O建立的客戶端

  9. * @author yangtao__anxpp.com

  10. * @version 1.0

  11. */

  12. public class Client {

  13. //預設的埠號

  14. private static int DEFAULT_SERVER_PORT = 12345;

  15. private static String DEFAULT_SERVER_IP = "127.0.0.1";

  16. public static void send(String expression){

  17. send(DEFAULT_SERVER_PORT,expression);

  18. }

  19. public static void send(int port,String expression){

  20. System.out.println("算術表示式為:" + expression);

  21. Socket socket = null;

  22. BufferedReader in = null;

  23. PrintWriter out = null;

  24. try{

  25. socket = new Socket(DEFAULT_SERVER_IP,port);

  26. in = new BufferedReader(new InputStreamReader(socket.getInputStream()));

  27. out = new PrintWriter(socket.getOutputStream(),true);

  28. out.println(expression);

  29. System.out.println("___結果為:" + in.readLine());

  30. }catch(Exception e){

  31. e.printStackTrace();

  32. }finally{

  33. //一下必要的清理工作

  34. if(in != null){

  35. try {

  36. in.close();

  37. } catch (IOException e) {

  38. e.printStackTrace();

  39. }

  40. in = null;

  41. }

  42. if(out != null){

  43. out.close();

  44. out = null;

  45. }

  46. if(socket != null){

  47. try {

  48. socket.close();

  49. } catch (IOException e) {

  50. e.printStackTrace();

  51. }

  52. socket = null;

  53. }

  54. }

  55. }

  56. }

 測試程式碼,為了方便在控制檯看輸出結果,放到同一個程式(jvm)中執行:

  1. package com.anxpp.io.calculator.bio;

  2. import java.io.IOException;

  3. import java.util.Random;

  4. /**

  5. * 測試方法

  6. * @author yangtao__anxpp.com

  7. * @version 1.0

  8. */

  9. public class Test {

  10. //測試主方法

  11. public static void main(String[] args) throws InterruptedException {

  12. //執行伺服器

  13. new Thread(new Runnable() {

  14. @Override

  15. public void run() {

  16. try {

  17. ServerBetter.start();

  18. } catch (IOException e) {

  19. e.printStackTrace();

  20. }

  21. }

  22. }).start();

  23. //避免客戶端先於伺服器啟動前執行程式碼

  24. Thread.sleep(100);

  25. //執行客戶端

  26. char operators[] = {'+','-','*','/'};

  27. Random random = new Random(System.currentTimeMillis());

  28. new Thread(new Runnable() {

  29. @SuppressWarnings("static-access")

  30. @Override

  31. public void run() {

  32. while(true){

  33. //隨機產生算術表示式

  34. String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);

  35. Client.send(expression);

  36. try {

  37. Thread.currentThread().sleep(random.nextInt(1000));

  38. } catch (InterruptedException e) {

  39. e.printStackTrace();

  40. }

  41. }

  42. }

  43. }).start();

  44. }

  45. }

其中一次的執行結果:

  1. 伺服器已啟動,埠號:12345

  2. 算術表示式為:4-2

  3. 伺服器收到訊息:4-2

  4. ___結果為:2

  5. 算術表示式為:5-10

  6. 伺服器收到訊息:5-10

  7. ___結果為:-5

  8. 算術表示式為:0-9

  9. 伺服器收到訊息:0-9

  10. ___結果為:-9

  11. 算術表示式為:0+6

  12. 伺服器收到訊息:0+6

  13. ___結果為:6

  14. 算術表示式為:1/6

  15. 伺服器收到訊息:1/6

  16. ___結果為:0.16666666666666666

從以上程式碼,很容易看出,BIO主要的問題在於每當有一個新的客戶端請求接入時,服務端必須建立一個新的執行緒來處理這條鏈路,在需要滿足高效能、高併發的場景是沒法應用的(大量建立新的執行緒會嚴重影響伺服器效能,甚至罷工)。

1.2、偽非同步I/O程式設計

    為了改進這種一連線一執行緒的模型,我們可以使用執行緒池來管理這些執行緒(需要了解更多請參考前面提供的文章),實現1個或多個執行緒處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽非同步I/O模型“。

    偽非同步I/O模型圖:

02

實現很簡單,我們只需要將新建執行緒的地方,交給執行緒池管理即可,只需要改動剛剛的Server程式碼即可:

  1. package com.anxpp.io.calculator.bio;

  2. import java.io.IOException;

  3. import java.net.ServerSocket;

  4. import java.net.Socket;

  5. import java.util.concurrent.ExecutorService;

  6. import java.util.concurrent.Executors;

  7. /**

  8. * BIO服務端原始碼__偽非同步I/O

  9. * @author yangtao__anxpp.com

  10. * @version 1.0

  11. */

  12. public final class ServerBetter {

  13. //預設的埠號

  14. private static int DEFAULT_PORT = 12345;

  15. //單例的ServerSocket

  16. private static ServerSocket server;

  17. //執行緒池 懶漢式的單例

  18. private static ExecutorService executorService = Executors.newFixedThreadPool(60);

  19. //根據傳入引數設定監聽埠,如果沒有引數呼叫以下方法並使用預設值

  20. public static void start() throws IOException{

  21. //使用預設值

  22. start(DEFAULT_PORT);

  23. }

  24. //這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了

  25. public synchronized static void start(int port) throws IOException{

  26. if(server != null) return;

  27. try{

  28. //通過建構函式建立ServerSocket

  29. //如果埠合法且空閒,服務端就監聽成功

  30. server = new ServerSocket(port);

  31. System.out.println("伺服器已啟動,埠號:" + port);

  32. //通過無線迴圈監聽客戶端連線

  33. //如果沒有客戶端接入,將阻塞在accept操作上。

  34. while(true){

  35. Socket socket = server.accept();

  36. //當有新的客戶端接入時,會執行下面的程式碼

  37. //然後建立一個新的執行緒處理這條Socket鏈路

  38. executorService.execute(new ServerHandler(socket));

  39. }

  40. }finally{

  41. //一些必要的清理工作

  42. if(server != null){

  43. System.out.println("伺服器已關閉。");

  44. server.close();

  45. server = null;

  46. }

  47. }

  48. }

  49. }

測試執行結果是一樣的。

    我們知道,如果使用CachedThreadPool執行緒池(不限制執行緒數量,如果不清楚請參考文首提供的文章),其實除了能自動幫我們管理執行緒(複用),看起來也就像是1:1的客戶端:執行緒數模型,而使用FixedThreadPool我們就有效的控制了執行緒的最大數量,保證了系統有限的資源的控制,實現了N:M的偽非同步I/O模型。

    但是,正因為限制了執行緒數量,如果發生大量併發請求,超過最大數量的執行緒就只能等待,直到執行緒池中的有空閒的執行緒可以被複用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發生:

  •     有資料可讀
  •     可用資料以及讀取完畢
  •     發生空指標或I/O異常

    所以在讀取資料較慢時(比如資料量大、網路傳輸慢等),大量併發的情況下,其他接入的訊息,只能一直等待,這就是最大的弊端。

    而後面即將介紹的NIO,就能解決這個難題。

2、NIO 程式設計

    JDK 1.4中的java.nio.*包中引入新的Java I/O庫,其目的是提高速度。實際上,“舊”的I/O包已經使用NIO重新實現過,即使我們不顯式的使用NIO程式設計,也能從中受益。速度的提高在檔案I/O和網路I/O中都可能會發生,但本文只討論後者。

    2.1、簡介

    NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用很久,即使它們在現在看來已經是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。

    NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。

    新增的著兩種通道都支援阻塞和非阻塞兩種模式。

    阻塞模式使用就像傳統中的支援一樣,比較簡單,但是效能和可靠性都不好;非阻塞模式正好與之相反。

    對於低負載、低併發的應用程式,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高併發的(網路)應用,應使用NIO的非阻塞模式來開發。

這裡寫圖片描述

    下面會先對基礎知識進行介紹。

    2.2、緩衝區 Buffer

Buffer是一個物件,包含一些要寫入或者讀出的資料。 在NIO庫中,所有資料都是用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中的;在寫入資料時,也是寫入到緩衝區中。任何時候訪問NIO中的資料,都是通過緩衝區進行操作。

緩衝區實際上是一個數組,並提供了對資料結構化訪問以及維護讀寫位置等資訊。

具體的快取區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的介面:Buffer。

Buffer有兩種工作模式:寫模式和讀模式。在讀模式下,應用程式只能從Buffer中讀取資料,不能進行寫操作。但是在寫模式下,應用程式是可以進行讀操作的,這就表示可能會出現髒讀的情況。所以一旦您決定要從Buffer中讀取資料,一定要將Buffer的狀態改為讀模式。

這裡寫圖片描述

    2.3、通道 Channel

    我們對資料的讀取和寫入要通過Channel,它就像水管一樣,是一個通道。通道不同於流的地方就是通道是雙向的,可以用於讀、寫和同時讀寫操作。

    底層的作業系統的通道一般都是全雙工的,所以全雙工的Channel比流能更好的對映底層作業系統的API。

    Channel主要分兩大類:

  •     SelectableChannel:使用者網路讀寫
  •     FileChannel:用於檔案操作

    後面程式碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。所有被Selector(選擇器)註冊的通道,只能是繼承了SelectableChannel類的子類。ServerSocketChannel同時支援UDP協議和TCP協議,SocketChannel支援TCP協議,DatagramChannel支援UDP報文。

這裡寫圖片描述

    2.4、多路複用器 Selector

Selector是Java  NIO 程式設計的基礎。

Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。

一個Selector可以同時輪詢多個Channel,因為JDK使用了epoll()代替傳統的select實現,所以沒有最大連線控制代碼1024/2048的限制。所以,只需要一個執行緒負責Selector的輪詢,就可以接入成千上萬的客戶端。

應用程式將向Selector物件註冊需要它關注的Channel,以及具體的某一個Channel會對哪些IO事件感興趣。Selector中也會維護一個“已經註冊的Channel”的容器。以下程式碼來自WindowsSelectorImpl實現類中,對已經註冊的Channel的管理容器:

  1. // Initial capacity of the poll array

  2. private final int INIT_CAP = 8;

  3. // Maximum number of sockets for select().

  4. // Should be INIT_CAP times a power of 2

  5. private final static int MAX_SELECTABLE_FDS = 1024;

  6. // The list of SelectableChannels serviced by this Selector. Every mod

  7. // MAX_SELECTABLE_FDS entry is bogus, to align this array with the poll

  8. // array, where the corresponding entry is occupied by the wakeupSocket

  9. private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];

多路複用IO技術是需要作業系統進行支援的,其特點就是作業系統可以同時掃描同一個埠上不同網路連線的時間。所以作為上層的JVM,必須要為不同作業系統的多路複用IO實現 編寫不同的程式碼。Windows環境下對應的實現類是sun.nio.ch.WindowsSelectorImpl:  

    2.5、NIO服務端

    程式碼比傳統的Socket程式設計看起來要複雜不少。

    直接貼程式碼吧,以註釋的形式給出程式碼說明。

    NIO建立的Server原始碼:

  1. package com.anxpp.io.calculator.nio;

  2. public class Server {

  3. private static int DEFAULT_PORT = 12345;

  4. private static ServerHandle serverHandle;

  5. public static void start(){

  6. start(DEFAULT_PORT);

  7. }

  8. public static synchronized void start(int port){

  9. if(serverHandle!=null)

  10. serverHandle.stop();

  11. serverHandle = new ServerHandle(port);

  12. new Thread(serverHandle,"Server").start();

  13. }

  14. public static void main(String[] args){

  15. start();

  16. }

  17. }

ServerHandle:

  1. package com.anxpp.io.calculator.nio;

  2. import java.io.IOException;

  3. import java.net.InetSocketAddress;

  4. import java.nio.ByteBuffer;

  5. import java.nio.channels.SelectionKey;

  6. import java.nio.channels.Selector;

  7. import java.nio.channels.ServerSocketChannel;

  8. import java.nio.channels.SocketChannel;

  9. import java.util.Iterator;

  10. import java.util.Set;

  11. import com.anxpp.io.utils.Calculator;

  12. /**

  13. * NIO服務端

  14. * @author yangtao__anxpp.com

  15. * @version 1.0

  16. */

  17. public class ServerHandle implements Runnable{

  18. private Selector selector;

  19. private ServerSocketChannel serverChannel;

  20. private volatile boolean started;

  21. /**

  22. * 構造方法

  23. * @param port 指定要監聽的埠號

  24. */

  25. public ServerHandle(int port) {

  26. try{

  27. //建立選擇器

  28. selector = Selector.open();

  29. //開啟監聽通道

  30. serverChannel = ServerSocketChannel.open();

  31. //如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式

  32. serverChannel.configureBlocking(false);//開啟非阻塞模式

  33. //繫結埠 backlog設為1024

  34. serverChannel.socket().bind(new InetSocketAddress(port),1024);

  35. //監聽客戶端連線請求

  36. serverChannel.register(selector, SelectionKey.OP_ACCEPT);

  37. //標記伺服器已開啟

  38. started = true;

  39. System.out.println("伺服器已啟動,埠號:" + port);

  40. }catch(IOException e){

  41. e.printStackTrace();

  42. System.exit(1);

  43. }

  44. }

  45. public void stop(){

  46. started = false;

  47. }

  48. @Override

  49. public void run() {

  50. //迴圈遍歷selector

  51. while(started){

  52. try{

  53. //無論是否有讀寫事件發生,selector每隔1s被喚醒一次

  54. selector.select(1000);

  55. //阻塞,只有當至少一個註冊的事件發生的時候才會繼續.

  56. // selector.select();

  57. Set<SelectionKey> keys = selector.selectedKeys();

  58. Iterator<SelectionKey> it = keys.iterator();

  59. SelectionKey key = null;

  60. while(it.hasNext()){

  61. key = it.next();

  62. it.remove();

  63. try{

  64. handleInput(key);

  65. }catch(Exception e){

  66. if(key != null){

  67. key.cancel();

  68. if(key.channel() != null){

  69. key.channel().close();

  70. }

  71. }

  72. }

  73. }

  74. }catch(Throwable t){

  75. t.printStackTrace();

  76. }

  77. }

  78. //selector關閉後會自動釋放裡面管理的資源

  79. if(selector != null)

  80. try{

  81. selector.close();

  82. }catch (Exception e) {

  83. e.printStackTrace();

  84. }

  85. }

  86. private void handleInput(SelectionKey key) throws IOException{

  87. if(key.isValid()){

  88. //處理新接入的請求訊息

  89. if(key.isAcceptable()){

  90. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

  91. //通過ServerSocketChannel的accept建立SocketChannel例項

  92. //完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立

  93. SocketChannel sc = ssc.accept();

  94. //設定為非阻塞的

  95. sc.configureBlocking(false);

  96. //註冊為讀

  97. sc.register(selector, SelectionKey.OP_READ);

  98. }

  99. //讀訊息

  100. if(key.isReadable()){

  101. SocketChannel sc = (SocketChannel) key.channel();

  102. //建立ByteBuffer,並開闢一個1M的緩衝區

  103. ByteBuffer buffer = ByteBuffer.allocate(1024);

  104. //讀取請求碼流,返回讀取到的位元組數

  105. int readBytes = sc.read(buffer);

  106. //讀取到位元組,對位元組進行編解碼

  107. if(readBytes>0){

  108. //將緩衝區當前的limit設定為position=0,用於後續對緩衝區的讀取操作

  109. buffer.flip();

  110. //根據緩衝區可讀位元組數建立位元組陣列

  111. byte[] bytes = new byte[buffer.remaining()];

  112. //將緩衝區可讀位元組陣列複製到新建的陣列中

  113. buffer.get(bytes);

  114. String expression = new String(bytes,"UTF-8");

  115. System.out.println("伺服器收到訊息:" + expression);

  116. //處理資料

  117. String result = null;

  118. try{

  119. result = Calculator.cal(expression).toString();

  120. }catch(Exception e){

  121. result = "計算錯誤:" + e.getMessage();

  122. }

  123. //傳送應答訊息

  124. doWrite(sc,result);

  125. }

  126. //沒有讀取到位元組 忽略

  127. // else if(readBytes==0);

  128. //鏈路已經關閉,釋放資源

  129. else if(readBytes<0){

  130. key.cancel();

  131. sc.close();

  132. }

  133. }

  134. }

  135. }

  136. //非同步傳送應答訊息

  137. private void doWrite(SocketChannel channel,String response) throws IOException{

  138. //將訊息編碼為位元組陣列

  139. byte[] bytes = response.getBytes();

  140. //根據陣列容量建立ByteBuffer

  141. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);

  142. //將位元組陣列複製到緩衝區

  143. writeBuffer.put(bytes);

  144. //flip操作

  145. writeBuffer.flip();

  146. //傳送緩衝區的位元組陣列

  147. channel.write(writeBuffer);

  148. //****此處不含處理“寫半包”的程式碼

  149. }

  150. }

可以看到,建立NIO服務端的主要步驟如下:

  1.     開啟ServerSocketChannel,監聽客戶端連線
  2.     繫結監聽埠,設定連線為非阻塞模式
  3.     建立Reactor執行緒,建立多路複用器並啟動執行緒
  4.     將ServerSocketChannel註冊到Reactor執行緒中的Selector上,監聽ACCEPT事件
  5.     Selector輪詢準備就緒的key
  6.     Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路
  7.     設定客戶端鏈路為非阻塞模式
  8.     將新接入的客戶端連線註冊到Reactor執行緒的Selector上,監聽讀操作,讀取客戶端傳送的網路訊息
  9.     非同步讀取客戶端訊息到緩衝區
  10.     對Buffer編解碼,處理半包訊息,將解碼成功的訊息封裝成Task
  11.     將應答訊息編碼為Buffer,呼叫SocketChannel的write將訊息非同步傳送給客戶端

    因為應答訊息的傳送,SocketChannel也是非同步非阻塞的,所以不能保證一次能吧需要傳送的資料傳送完,此時就會出現寫半包的問題。我們需要註冊寫操作,不斷輪詢Selector將沒有傳送完的訊息傳送完畢,然後通過Buffer的hasRemain()方法判斷訊息是否傳送完成。

    2.6、NIO客戶端

    還是直接上程式碼吧,過程也不需要太多解釋了,跟服務端程式碼有點類似。

    Client:

  1. package com.anxpp.io.calculator.nio;

  2. public class Client {

  3. private static String DEFAULT_HOST = "127.0.0.1";

  4. private static int DEFAULT_PORT = 12345;

  5. private static ClientHandle clientHandle;

  6. public static void start(){

  7. start(DEFAULT_HOST,DEFAULT_PORT);

  8. }

  9. public static synchronized void start(String ip,int port){

  10. if(clientHandle!=null)

  11. clientHandle.stop();

  12. clientHandle = new ClientHandle(ip,port);

  13. new Thread(clientHandle,"Server").start();

  14. }

  15. //向伺服器傳送訊息

  16. public static boolean sendMsg(String msg) throws Exception{

  17. if(msg.equals("q")) return false;

  18. clientHandle.sendMsg(msg);

  19. return true;

  20. }

  21. public static void main(String[] args){

  22. start();

  23. }

  24. }

 ClientHandle:

  1. package com.anxpp.io.calculator.nio;

  2. import java.io.IOException;

  3. import java.net.InetSocketAddress;

  4. import java.nio.ByteBuffer;

  5. import java.nio.channels.SelectionKey;

  6. import java.nio.channels.Selector;

  7. import java.nio.channels.SocketChannel;

  8. import java.util.Iterator;

  9. import java.util.Set;

  10. /**

  11. * NIO客戶端

  12. * @author yangtao__anxpp.com

  13. * @version 1.0

  14. */

  15. public class ClientHandle implements Runnable{

  16. private String host;

  17. private int port;

  18. private Selector selector;

  19. private SocketChannel socketChannel;

  20. private volatile boolean started;

  21. public ClientHandle(String ip,int port) {

  22. this.host = ip;

  23. this.port = port;

  24. try{

  25. //建立選擇器

  26. selector = Selector.open();

  27. //開啟監聽通道

  28. socketChannel = SocketChannel.open();

  29. //如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式

  30. socketChannel.configureBlocking(false);//開啟非阻塞模式

  31. started = true;

  32. }catch(IOException e){

  33. e.printStackTrace();

  34. System.exit(1);

  35. }

  36. }

  37. public void stop(){

  38. started = false;

  39. }

  40. @Override

  41. public void run() {

  42. try{

  43. doConnect();

  44. }catch(IOException e){

  45. e.printStackTrace();

  46. System.exit(1);

  47. }

  48. //迴圈遍歷selector

  49. while(started){

  50. try{

  51. //無論是否有讀寫事件發生,selector每隔1s被喚醒一次

  52. selector.select(1000);

  53. //阻塞,只有當至少一個註冊的事件發生的時候才會繼續.

  54. // selector.select();

  55. Set<SelectionKey> keys = selector.selectedKeys();

  56. Iterator<SelectionKey> it = keys.iterator();

  57. SelectionKey key = null;

  58. while(it.hasNext()){

  59. key = it.next();

  60. it.remove();

  61. try{

  62. handleInput(key);

  63. }catch(Exception e){

  64. if(key != null){

  65. key.cancel();

  66. if(key.channel() != null){

  67. key.channel().close();

  68. }

  69. }

  70. }

  71. }

  72. }catch(Exception e){

  73. e.printStackTrace();

  74. System.exit(1);

  75. }

  76. }

  77. //selector關閉後會自動釋放裡面管理的資源

  78. if(selector != null)

  79. try{

  80. selector.close();

  81. }catch (Exception e) {

  82. e.printStackTrace();

  83. }

  84. }

  85. private void handleInput(SelectionKey key) throws IOException{

  86. if(key.isValid()){

  87. SocketChannel sc = (SocketChannel) key.channel();

  88. if(key.isConnectable()){

  89. if(sc.finishConnect());

  90. else System.exit(1);

  91. }

  92. //讀訊息

  93. if(key.isReadable()){

  94. //建立ByteBuffer,並開闢一個1M的緩衝區

  95. ByteBuffer buffer = ByteBuffer.allocate(1024);

  96. //讀取請求碼流,返回讀取到的位元組數

  97. int readBytes = sc.read(buffer);

  98. //讀取到位元組,對位元組進行編解碼

  99. if(readBytes>0){

  100. //將緩衝區當前的limit設定為position=0,用於後續對緩衝區的讀取操作

  101. buffer.flip();

  102. //根據緩衝區可讀位元組數建立位元組陣列

  103. byte[] bytes = new byte[buffer.remaining()];

  104. //將緩衝區可讀位元組陣列複製到新建的陣列中

  105. buffer.get(bytes);

  106. String result = new String(bytes,"UTF-8");

  107. System.out.println("客戶端收到訊息:" + result);

  108. }

  109. //沒有讀取到位元組 忽略

  110. // else if(readBytes==0);

  111. //鏈路已經關閉,釋放資源

  112. else if(readBytes<0){

  113. key.cancel();

  114. sc.close();

  115. }

  116. }

  117. }

  118. }

  119. //非同步傳送訊息

  120. private void doWrite(SocketChannel channel,String request) throws IOException{

  121. //將訊息編碼為位元組陣列

  122. byte[] bytes = request.getBytes();

  123. //根據陣列容量建立ByteBuffer

  124. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);

  125. //將位元組陣列複製到緩衝區

  126. writeBuffer.put(bytes);

  127. //flip操作

  128. writeBuffer.flip();

  129. //傳送緩衝區的位元組陣列

  130. channel.write(writeBuffer);

  131. //****此處不含處理“寫半包”的程式碼

  132. }

  133. private void doConnect() throws IOException{

  134. if(socketChannel.connect(new InetSocketAddress(host,port)));

  135. else socketChannel.register(selector, SelectionKey.OP_CONNECT);

  136. }

  137. public void sendMsg(String msg) throws Exception{

  138. socketChannel.register(selector, SelectionKey.OP_READ);

  139. doWrite(socketChannel, msg);

  140. }

  141. }

2.7、演示結果

    首先執行伺服器,順便也執行一個客戶端:

  1. package com.anxpp.io.calculator.nio;

  2. import java.util.Scanner;

  3. /**

  4. * 測試方法

  5. * @author yangtao__anxpp.com

  6. * @version 1.0

  7. */

  8. public class Test {

  9. //測試主方法

  10. @SuppressWarnings("resource")

  11. public static void main(String[] args) throws Exception{

  12. //執行伺服器

  13. Server.start();

  14. //避免客戶端先於伺服器啟動前執行程式碼

  15. Thread.sleep(100);

  16. //執行客戶端

  17. Client.start();

  18. while(Client.sendMsg(new Scanner(System.in).nextLine()));

  19. }

  20. }

我們也可以單獨執行客戶端,效果都是一樣的。

    一次測試的結果:

  1. 伺服器已啟動,埠號:12345

  2. 1+2+3+4+5+6

  3. 伺服器收到訊息:1+2+3+4+5+6

  4. 客戶端收到訊息:21

  5. 1*2/3-4+5*6/7-8

  6. 伺服器收到訊息:1*2/3-4+5*6/7-8

  7. 客戶端收到訊息:-7.0476190476190474

3、AIO程式設計

NIO 2.0引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。

非同步的套接字通道時真正的非同步非阻塞I/O,對應於UNIX網路程式設計中的事件驅動I/O(AIO)。他不需要過多的Selector對註冊的通道進行輪詢即可實現非同步讀寫,從而簡化了NIO的程式設計模型。

非同步IO則是採用“訂閱-通知”模式:即應用程式向作業系統註冊IO監聽,然後繼續做自己的事情。當作業系統發生IO事件,並且準備好資料後,在主動通知應用程式,觸發相應的函式:

這裡寫圖片描述

和同步IO一樣,非同步IO也是由作業系統進行支援的。微軟的windows系統提供了一種非同步IO技術:IOCP(I/O Completion Port,I/O完成埠)(Windows下負責實現套接字通道的具體類是“sun.nio.ch.WindowsAsynchronousSocketChannelImpl”),Linux下由於沒有這種非同步IO技術,所以使用的是epoll(上文介紹過的一種多路複用IO技術的實現)對非同步IO進行模擬。

在JAVA NIO框架中,我們說到了一個重要概念“selector”(選擇器)。它負責代替應用查詢中所有已註冊的通道到作業系統中進行IO事件輪詢、管理當前註冊的通道集合,定位發生事件的通道等操操作;但是在JAVA AIO框架中,由於應用程式不是“輪詢”方式,而是訂閱-通知方式,所以不再需要“selector”(選擇器)了,改由channel通道直接到作業系統註冊監聽。JAVA NIO和JAVA AIO框架,除了因為作業系統的實現不一樣而去掉了Selector外,其他的重要概念都是存在的,例如上文中提到的Channel的概念,還有演示程式碼中使用的Buffer快取方式。

JAVA AIO框架中,只實現了兩種網路IO通道“AsynchronousServerSocketChannel”(伺服器監聽通道)、“AsynchronousSocketChannel”(socket套接字通道)。但是無論哪種通道他們都有獨立的fileDescriptor(檔案識別符號)、attachment(附件,附件可以使任意物件,類似“通道上下文”),並被獨立的SocketChannelReadHandle類例項引用。

直接上程式碼吧。

 3.1、Server端程式碼

    Server:

  1. package com.anxpp.io.calculator.aio.server;

  2. /**

  3. * AIO服務端

  4. * @author yangtao__anxpp.com

  5. * @version 1.0

  6. */

  7. public class Server {

  8. private static int DEFAULT_PORT = 12345;

  9. private static AsyncServerHandler serverHandle;

  10. public volatile static long clientCount = 0;

  11. public static void start(){

  12. start(DEFAULT_PORT);

  13. }

  14. public static synchronized void start(int port){

  15. if(serverHandle!=null)

  16. return;

  17. serverHandle = new AsyncServerHandler(port);

  18. new Thread(serverHandle,"Server").start();

  19. }

  20. public static void main(String[] args){

  21. Server.start();

  22. }

  23. }

   AsyncServerHandler:

  1. package com.anxpp.io.calculator.aio.server;

  2. import java.io.IOException;

  3. import java.net.InetSocketAddress;

  4. import java.nio.channels.AsynchronousServerSocketChannel;

  5. import java.util.concurrent.CountDownLatch;

  6. public class AsyncServerHandler implements Runnable {

  7. public CountDownLatch latch;

  8. public AsynchronousServerSocketChannel channel;

  9. public AsyncServerHandler(int port) {

  10. try {

  11. //建立服務端通道

  12. channel = AsynchronousServerSocketChannel.open();

  13. //繫結埠

  14. channel.bind(new InetSocketAddress(port));

  15. System.out.println("伺服器已啟動,埠號:" + port);

  16. } catch (IOException e) {

  17. e.printStackTrace();

  18. }

  19. }

  20. @Override

  21. public void run() {

  22. //CountDownLatch初始化

  23. //它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞

  24. //此處,讓現場在此阻塞,防止服務端執行完成後退出

  25. //也可以使用while(true)+sleep

  26. //生成環境就不需要擔心這個問題,以為服務端是不會退出的

  27. latch = new CountDownLatch(1);

  28. //用於接收客戶端的連線

  29. channel.accept(this,new AcceptHandler());

  30. try {

  31. latch.await();

  32. } catch (InterruptedException e) {

  33. e.printStackTrace();

  34. }

  35. }

  36. }

AcceptHandler:

  1. package com.anxpp.io.calculator.aio.server;

  2. import java.nio.ByteBuffer;

  3. import java.nio.channels.AsynchronousSocketChannel;

  4. import java.nio.channels.CompletionHandler;

  5. //作為handler接收客戶端連線

  6. public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {

  7. @Override

  8. public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {

  9. //繼續接受其他客戶端的請求

  10. Server.clientCount++;

  11. System.out.println("連線的客戶端數:" + Server.clientCount);

  12. serverHandler.channel.accept(serverHandler, this);

  13. //建立新的Buffer

  14. ByteBuffer buffer = ByteBuffer.allocate(1024);

  15. //非同步讀 第三個引數為接收訊息回撥的業務Handler

  16. channel.read(buffer, buffer, new ReadHandler(channel));

  17. }

  18. @Override

  19. public void failed(Throwable exc, AsyncServerHandler serverHandler) {

  20. exc.printStackTrace();

  21. serverHandler.latch.countDown();

  22. }

  23. }

ReadHandler:

  1. package com.anxpp.io.calculator.aio.server;

  2. import java.io.IOException;

  3. import java.io.UnsupportedEncodingException;

  4. import java.nio.ByteBuffer;

  5. import java.nio.channels.AsynchronousSocketChannel;

  6. import java.nio.channels.CompletionHandler;

  7. import com.anxpp.io.utils.Calculator;

  8. public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {

  9. //用於讀取半包訊息和傳送應答

  10. private AsynchronousSocketChannel channel;

  11. public ReadHandler(AsynchronousSocketChannel channel) {

  12. this.channel = channel;

  13. }

  14. //讀取到訊息後的處理

  15. @Override

  16. public void completed(Integer result, ByteBuffer attachment) {

  17. //flip操作

  18. attachment.flip();

  19. //根據

  20. byte[] message = new byte[attachment.remaining()];

  21. attachment.get(message);

  22. try {

  23. String expression = new String(message, "UTF-8");

  24. System.out.println("伺服器收到訊息: " + expression);

  25. String calrResult = null;

  26. try{

  27. calrResult = Calculator.cal(expression).toString();

  28. }catch(Exception e){

  29. calrResult = "計算錯誤:" + e.getMessage();

  30. }

  31. //向客戶端傳送訊息

  32. doWrite(calrResult);

  33. } catch (UnsupportedEncodingException e) {

  34. e.printStackTrace();

  35. }

  36. }

  37. //傳送訊息

  38. private void doWrite(String result) {

  39. byte[] bytes = result.getBytes();

  40. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);

  41. writeBuffer.put(bytes);

  42. writeBuffer.flip();

  43. //非同步寫資料 引數與前面的read一樣

  44. channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {

  45. @Override

  46. public void completed(Integer result, ByteBuffer buffer) {

  47. //如果沒有傳送完,就繼續傳送直到完成

  48. if (buffer.hasRemaining())

  49. channel.write(buffer, buffer, this);

  50. else{

  51. //建立新的Buffer

  52. ByteBuffer readBuffer = ByteBuffer.allocate(1024);

  53. //非同步讀 第三個引數為接收訊息回撥的業務Handler

  54. channel.read(readBuffer, readBuffer, new ReadHandler(channel));

  55. }

  56. }

  57. @Override

  58. public void failed(Throwable exc, ByteBuffer attachment) {

  59. try {

  60. channel.close();

  61. } catch (IOException e) {

  62. }

  63. }

  64. });

  65. }

  66. @Override

  67. public void failed(Throwable exc, ByteBuffer attachment) {

  68. try {

  69. this.channel.close();

  70. } catch (IOException e) {

  71. e.printStackTrace();

  72. }

  73. }

  74. }

 OK,這樣就已經完成了,其實說起來也簡單,雖然程式碼感覺很多,但是API比NIO的使用起來真的簡單多了,主要就是監聽、讀、寫等各種CompletionHandler。此處本應有一個WriteHandler的,確實,我們在ReadHandler中,以一個匿名內部類實現了它。

    下面看客戶端程式碼。

    3.2、Client端程式碼

    Client:

  1. package com.anxpp.io.calculator.aio.client;

  2. import java.util.Scanner;

  3. public class Client {

  4. private static String DEFAULT_HOST = "127.0.0.1";

  5. private static int DEFAULT_PORT = 12345;

  6. private static AsyncClientHandler clientHandle;

  7. public static void start(){

  8. st