1. 程式人生 > >Java IO 程式設計(BIO、NIO、AIO完整例項程式碼)

Java IO 程式設計(BIO、NIO、AIO完整例項程式碼)

 

    本文會從傳統的BIO到NIO再到AIO自淺至深介紹,並附上程式碼講解。

  原始碼地址: https://github.com/kkman2008/java-IO.git

    下面程式碼中會使用這樣一個例子:客戶端傳送一段算式的字串到伺服器,伺服器計算後返回結果到客戶端。

    程式碼的所有說明,都直接作為註釋,嵌入到程式碼中,看程式碼時就能更容易理解,程式碼中會用到一個計算結果的工具類,見文章程式碼部分。

    相關的基礎知識文章推薦:

    

Linux 網路 I/O 模型簡介(圖文)


    Java 併發(多執行緒)    

1、BIO程式設計

    1.1、傳統的BIO程式設計

    網路程式設計的基本模型是C/S模型,即兩個程序間的通訊。

    服務端提供IP和監聽埠,客戶端通過連線操作想服務端監聽的地址發起連線請求,通過三次握手連線,如果連線成功建立,雙方就可以通過套接字進行通訊。

    傳統的同步阻塞模型開發中,ServerSocket負責繫結IP地址,啟動監聽埠;Socket負責發起連線操作。連線成功後,雙方通過輸入和輸出流進行同步阻塞式通訊。 

    簡單的描述一下BIO的服務端通訊模型:採用BIO通訊模型的服務端,通常由一個獨立的Acceptor執行緒負責監聽客戶端的連線,它接收到客戶端連線請求之後為每個客戶端建立一個新的執行緒進行鏈路處理沒處理完成後,通過輸出流返回應答給客戶端,執行緒銷燬。即典型的一請求一應答通宵模型。

    傳統BIO通訊模型圖:

    01

    該模型最大的問題就是缺乏彈性伸縮能力,當客戶端併發訪問量增加後,服務端的執行緒個數和客戶端併發訪問數呈1:1的正比關係,Java中的執行緒也是比較寶貴的系統資源,執行緒數量快速膨脹後,系統的效能將急劇下降,隨著訪問量的繼續增大,系統最終就死-掉-了

    同步阻塞式I/O建立的Server原始碼:

 
  1. package com.anxpp.calculator.bio;
  2. import java.io.IOException;
  3. import java.net.ServerSocket;
  4. import java.net.Socket;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. /**
  8. * BIO服務端原始碼
  9. * @author yangtao__anxpp.com
  10. * @version 1.0
  11. */
  12. public final class Server {
  13. //預設的埠號
  14. private static int DEFAULT_PORT = 12345;
  15. //單例的ServerSocket
  16. private static ServerSocket server;
  17. //根據傳入引數設定監聽埠,如果沒有引數呼叫以下方法並使用預設值
  18. public static void start() throws IOException{
  19. //使用預設值
  20. start(DEFAULT_PORT);
  21. }
  22. //這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
  23. public synchronized static void start(int port) throws IOException{
  24. if(server != null) return;
  25. try{
  26. //通過建構函式建立ServerSocket
  27. //如果埠合法且空閒,服務端就監聽成功
  28. server = new ServerSocket(port);
  29. System.out.println("伺服器已啟動,埠號:" + port);
  30. Socket socket;
  31. //通過無線迴圈監聽客戶端連線
  32. //如果沒有客戶端接入,將阻塞在accept操作上。
  33. while(true){
  34. socket = server.accept();
  35. //當有新的客戶端接入時,會執行下面的程式碼
  36. //然後建立一個新的執行緒處理這條Socket鏈路
  37. new Thread(new ServerHandler(socket)).start();
  38. }
  39. }finally{
  40. //一些必要的清理工作
  41. if(server != null){
  42. System.out.println("伺服器已關閉。");
  43. server.close();
  44. server = null;
  45. }
  46. }
  47. }
  48. }

    客戶端訊息處理執行緒ServerHandler原始碼:

 
  1. package com.anxpp.calculator.bio;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.io.PrintWriter;
  6. import java.net.Socket;
  7. import com.anxpp.utils.Calculator;
  8. /**
  9. * 客戶端執行緒
  10. * @author yangtao__anxpp.com
  11. * 用於處理一個客戶端的Socket鏈路
  12. */
  13. public class ServerHandler implements Runnable{
  14. private Socket socket;
  15. public ServerHandler(Socket socket) {
  16. this.socket = socket;
  17. }
  18. @Override
  19. public void run() {
  20. BufferedReader in = null;
  21. PrintWriter out = null;
  22. try{
  23. in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  24. out = new PrintWriter(socket.getOutputStream(),true);
  25. String expression;
  26. String result;
  27. while(true){
  28. //通過BufferedReader讀取一行
  29. //如果已經讀到輸入流尾部,返回null,退出迴圈
  30. //如果得到非空值,就嘗試計算結果並返回
  31. if((expression = in.readLine())==null) break;
  32. System.out.println("伺服器收到訊息:" + expression);
  33. try{
  34. result = Calculator.cal(expression).toString();
  35. }catch(Exception e){
  36. result = "計算錯誤:" + e.getMessage();
  37. }
  38. out.println(result);
  39. }
  40. }catch(Exception e){
  41. e.printStackTrace();
  42. }finally{
  43. //一些必要的清理工作
  44. if(in != null){
  45. try {
  46. in.close();
  47. } catch (IOException e) {
  48. e.printStackTrace();
  49. }
  50. in = null;
  51. }
  52. if(out != null){
  53. out.close();
  54. out = null;
  55. }
  56. if(socket != null){
  57. try {
  58. socket.close();
  59. } catch (IOException e) {
  60. e.printStackTrace();
  61. }
  62. socket = null;
  63. }
  64. }
  65. }
  66. }

    同步阻塞式I/O建立的Client原始碼:

 
  1. package com.anxpp.calculator.bio;
  2. import java.io.BufferedReader;
  3. import java.io.IOException;
  4. import java.io.InputStreamReader;
  5. import java.io.PrintWriter;
  6. import java.net.Socket;
  7. /**
  8. * 阻塞式I/O建立的客戶端
  9. * @author yangtao__anxpp.com
  10. * @version 1.0
  11. */
  12. public class Client {
  13. //預設的埠號
  14. private static int DEFAULT_SERVER_PORT = 12345;
  15. private static String DEFAULT_SERVER_IP = "127.0.0.1";
  16. public static void send(String expression){
  17. send(DEFAULT_SERVER_PORT,expression);
  18. }
  19. public static void send(int port,String expression){
  20. System.out.println("算術表示式為:" + expression);
  21. Socket socket = null;
  22. BufferedReader in = null;
  23. PrintWriter out = null;
  24. try{
  25. socket = new Socket(DEFAULT_SERVER_IP,port);
  26. in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  27. out = new PrintWriter(socket.getOutputStream(),true);
  28. out.println(expression);
  29. System.out.println("___結果為:" + in.readLine());
  30. }catch(Exception e){
  31. e.printStackTrace();
  32. }finally{
  33. //一下必要的清理工作
  34. if(in != null){
  35. try {
  36. in.close();
  37. } catch (IOException e) {
  38. e.printStackTrace();
  39. }
  40. in = null;
  41. }
  42. if(out != null){
  43. out.close();
  44. out = null;
  45. }
  46. if(socket != null){
  47. try {
  48. socket.close();
  49. } catch (IOException e) {
  50. e.printStackTrace();
  51. }
  52. socket = null;
  53. }
  54. }
  55. }
  56. }

    測試程式碼,為了方便在控制檯看輸出結果,放到同一個程式(jvm)中執行:

 
  1. package com.anxpp.calculator.bio;
  2. import java.io.IOException;
  3. import java.util.Random;
  4. /**
  5. * 測試方法
  6. * @author yangtao__anxpp.com
  7. * @version 1.0
  8. */
  9. public class Test {
  10. //測試主方法
  11. public static void main(String[] args) throws InterruptedException {
  12. //執行伺服器
  13. new Thread(new Runnable() {
  14. @Override
  15. public void run() {
  16. try {
  17. Server.start();
  18. } catch (IOException e) {
  19. e.printStackTrace();
  20. }
  21. }
  22. }).start();
  23. //避免客戶端先於伺服器啟動前執行程式碼
  24. Thread.sleep(100);
  25. //執行客戶端
  26. char operators[] = {'+','-','*','/'};
  27. Random random = new Random(System.currentTimeMillis());
  28. new Thread(new Runnable() {
  29. @SuppressWarnings("static-access")
  30. @Override
  31. public void run() {
  32. while(true){
  33. //隨機產生算術表示式
  34. String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
  35. Client.send(expression);
  36. try {
  37. Thread.currentThread().sleep(random.nextInt(1000));
  38. } catch (InterruptedException e) {
  39. e.printStackTrace();
  40. }
  41. }
  42. }
  43. }).start();
  44. }
  45. }

    其中一次的執行結果:

 
  1. 伺服器已啟動,埠號:12345
  2. 算術表示式為:4-2
  3. 伺服器收到訊息:4-2
  4. ___結果為:2
  5. 算術表示式為:5-10
  6. 伺服器收到訊息:5-10
  7. ___結果為:-5
  8. 算術表示式為:0-9
  9. 伺服器收到訊息:0-9
  10. ___結果為:-9
  11. 算術表示式為:0+6
  12. 伺服器收到訊息:0+6
  13. ___結果為:6
  14. 算術表示式為:1/6
  15. 伺服器收到訊息:1/6
  16. ___結果為:0.16666666666666666
  17. ...

    從以上程式碼,很容易看出,BIO主要的問題在於每當有一個新的客戶端請求接入時,服務端必須建立一個新的執行緒來處理這條鏈路,在需要滿足高效能、高併發的場景是沒法應用的(大量建立新的執行緒會嚴重影響伺服器效能,甚至罷工)。

    1.2、偽非同步I/O程式設計

    為了改進這種一連線一執行緒的模型,我們可以使用執行緒池來管理這些執行緒(需要了解更多請參考前面提供的文章),實現1個或多個執行緒處理N個客戶端的模型(但是底層還是使用的同步阻塞I/O),通常被稱為“偽非同步I/O模型“。

    偽非同步I/O模型圖:

    02

    實現很簡單,我們只需要將新建執行緒的地方,交給執行緒池管理即可,只需要改動剛剛的Server程式碼即可:

 
  1. package com.anxpp.calculator.bio;
  2. import java.io.IOException;
  3. import java.net.ServerSocket;
  4. import java.net.Socket;
  5. import java.util.concurrent.ExecutorService;
  6. import java.util.concurrent.Executors;
  7. /**
  8. * BIO服務端原始碼__偽非同步I/O
  9. * @author yangtao__anxpp.com
  10. * @version 1.0
  11. */
  12. public final class ServerBetter {
  13. //預設的埠號
  14. private static int DEFAULT_PORT = 12345;
  15. //單例的ServerSocket
  16. private static ServerSocket server;
  17. //執行緒池 懶漢式的單例
  18. private static ExecutorService executorService = Executors.newFixedThreadPool(60);
  19. //根據傳入引數設定監聽埠,如果沒有引數呼叫以下方法並使用預設值
  20. public static void start() throws IOException{
  21. //使用預設值
  22. start(DEFAULT_PORT);
  23. }
  24. //這個方法不會被大量併發訪問,不太需要考慮效率,直接進行方法同步就行了
  25. public synchronized static void start(int port) throws IOException{
  26. if(server != null) return;
  27. try{
  28. //通過建構函式建立ServerSocket
  29. //如果埠合法且空閒,服務端就監聽成功
  30. server = new ServerSocket(port);
  31. System.out.println("伺服器已啟動,埠號:" + port);
  32. Socket socket;
  33. //通過無線迴圈監聽客戶端連線
  34. //如果沒有客戶端接入,將阻塞在accept操作上。
  35. while(true){
  36. socket = server.accept();
  37. //當有新的客戶端接入時,會執行下面的程式碼
  38. //然後建立一個新的任務處理這條Socket鏈路,並把任務交給執行緒池管理
  39. executorService.execute(new ServerHandler(socket));
  40. }
  41. }finally{
  42. //一些必要的清理工作
  43. if(server != null){
  44. System.out.println("伺服器已關閉。");
  45. server.close();
  46. server = null;
  47. }
  48. }
  49. }
  50. }

    測試執行結果是一樣的。

    我們知道,如果使用CachedThreadPool執行緒池(不限制執行緒數量,如果不清楚請參考文首提供的文章),其實除了能自動幫我們管理執行緒(複用),看起來也就像是1:1的客戶端:執行緒數模型,而使用FixedThreadPool我們就有效的控制了執行緒的最大數量,保證了系統有限的資源的控制,實現了N:M的偽非同步I/O模型。

    但是,正因為限制了執行緒數量,如果發生大量併發請求,超過最大數量的執行緒就只能等待,直到執行緒池中的有空閒的執行緒可以被複用。而對Socket的輸入流就行讀取時,會一直阻塞,直到發生:

  •     有資料可讀
  •     可用資料以及讀取完畢
  •     發生空指標或I/O異常

    所以在讀取資料較慢時(比如資料量大、網路傳輸慢等),大量併發的情況下,其他接入的訊息,只能一直等待,這就是最大的弊端。

    而後面即將介紹的NIO,就能解決這個難題。

 

2、NIO 程式設計

    JDK 1.4中的java.nio.*包中引入新的Java I/O庫,其目的是提高速度。實際上,“舊”的I/O包已經使用NIO重新實現過,即使我們不顯式的使用NIO程式設計,也能從中受益。速度的提高在檔案I/O和網路I/O中都可能會發生,但本文只討論後者。

    2.1、簡介

    NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的(其實在JDK 1.4中就已經被引入了,但這個名詞還會繼續用很久,即使它們在現在看來已經是“舊”的了,所以也提示我們在命名時,需要好好考慮),做了很大的改變。但民間跟多人稱之為Non-block I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。

    NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。

    新增的著兩種通道都支援阻塞和非阻塞兩種模式。

    阻塞模式使用就像傳統中的支援一樣,比較簡單,但是效能和可靠性都不好;非阻塞模式正好與之相反。

    對於低負載、低併發的應用程式,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高併發的(網路)應用,應使用NIO的非阻塞模式來開發。

    下面會先對基礎知識進行介紹。

    2.2、緩衝區 Buffer

    Buffer是一個物件,包含一些要寫入或者讀出的資料。

    在NIO庫中,所有資料都是用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中的;在寫入資料時,也是寫入到緩衝區中。任何時候訪問NIO中的資料,都是通過緩衝區進行操作。

    緩衝區實際上是一個數組,並提供了對資料結構化訪問以及維護讀寫位置等資訊。

    具體的快取區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的介面:Buffer。

    2.3、通道 Channel

    我們對資料的讀取和寫入要通過Channel,它就像水管一樣,是一個通道。通道不同於流的地方就是通道是雙向的,可以用於讀、寫和同時讀寫操作。

    底層的作業系統的通道一般都是全雙工的,所以全雙工的Channel比流能更好的對映底層作業系統的API。

    Channel主要分兩大類:

  •     SelectableChannel:使用者網路讀寫
  •     FileChannel:用於檔案操作

    後面程式碼會涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。

    2.4、多路複用器 Selector

    Selector是Java  NIO 程式設計的基礎。

    Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢註冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的集合,進行後續的I/O操作。

    一個Selector可以同時輪詢多個Channel,因為JDK使用了epoll()代替傳統的select實現,所以沒有最大連線控制代碼1024/2048的限制。所以,只需要一個執行緒負責Selector的輪詢,就可以接入成千上萬的客戶端。

    2.5、NIO服務端

    程式碼比傳統的Socket程式設計看起來要複雜不少。

    直接貼程式碼吧,以註釋的形式給出程式碼說明。

    NIO建立的Server原始碼:

 
  1. package com.anxpp.calculator.nio;
  2. public class Server {
  3. private static int DEFAULT_PORT = 12345;
  4. private static ServerHandle serverHandle;
  5. public static void start(){
  6. start(DEFAULT_PORT);
  7. }
  8. public static synchronized void start(int port){
  9. if(serverHandle!=null)
  10. serverHandle.stop();
  11. serverHandle = new ServerHandle(port);
  12. new Thread(serverHandle,"Server").start();
  13. }
  14. public static void main(String[] args){
  15. start();
  16. }
  17. }

    ServerHandle:

 
  1. package com.anxpp.calculator.nio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. import com.anxpp.utils.Calculator;
  12. /**
  13. * NIO服務端
  14. * @author yangtao__anxpp.com
  15. * @version 1.0
  16. */
  17. public class ServerHandle implements Runnable{
  18. private Selector selector;
  19. private ServerSocketChannel serverChannel;
  20. private volatile boolean started;
  21. /**
  22. * 構造方法
  23. * @param port 指定要監聽的埠號
  24. */
  25. public ServerHandle(int port) {
  26. try{
  27. //建立選擇器
  28. selector = Selector.open();
  29. //開啟監聽通道
  30. serverChannel = ServerSocketChannel.open();
  31. //如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
  32. serverChannel.configureBlocking(false);//開啟非阻塞模式
  33. //繫結埠 backlog設為1024
  34. serverChannel.socket().bind(new InetSocketAddress(port),1024);
  35. //監聽客戶端連線請求
  36. serverChannel.register(selector, SelectionKey.OP_ACCEPT);
  37. //標記伺服器已開啟
  38. started = true;
  39. System.out.println("伺服器已啟動,埠號:" + port);
  40. }catch(IOException e){
  41. e.printStackTrace();
  42. System.exit(1);
  43. }
  44. }
  45. public void stop(){
  46. started = false;
  47. }
  48. @Override
  49. public void run() {
  50. //迴圈遍歷selector
  51. while(started){
  52. try{
  53. //無論是否有讀寫事件發生,selector每隔1s被喚醒一次
  54. selector.select(1000);
  55. //阻塞,只有當至少一個註冊的事件發生的時候才會繼續.
  56. // selector.select();
  57. Set<SelectionKey> keys = selector.selectedKeys();
  58. Iterator<SelectionKey> it = keys.iterator();
  59. SelectionKey key = null;
  60. while(it.hasNext()){
  61. key = it.next();
  62. it.remove();
  63. try{
  64. handleInput(key);
  65. }catch(Exception e){
  66. if(key != null){
  67. key.cancel();
  68. if(key.channel() != null){
  69. key.channel().close();
  70. }
  71. }
  72. }
  73. }
  74. }catch(Throwable t){
  75. t.printStackTrace();
  76. }
  77. }
  78. //selector關閉後會自動釋放裡面管理的資源
  79. if(selector != null)
  80. try{
  81. selector.close();
  82. }catch (Exception e) {
  83. e.printStackTrace();
  84. }
  85. }
  86. private void handleInput(SelectionKey key) throws IOException{
  87. if(key.isValid()){
  88. //處理新接入的請求訊息
  89. if(key.isAcceptable()){
  90. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
  91. //通過ServerSocketChannel的accept建立SocketChannel例項
  92. //完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立
  93. SocketChannel sc = ssc.accept();
  94. //設定為非阻塞的
  95. sc.configureBlocking(false);
  96. //註冊為讀
  97. sc.register(selector, SelectionKey.OP_READ);
  98. }
  99. //讀訊息
  100. if(key.isReadable()){
  101. SocketChannel sc = (SocketChannel) key.channel();
  102. //建立ByteBuffer,並開闢一個1M的緩衝區
  103. ByteBuffer buffer = ByteBuffer.allocate(1024);
  104. //讀取請求碼流,返回讀取到的位元組數
  105. int readBytes = sc.read(buffer);
  106. //讀取到位元組,對位元組進行編解碼
  107. if(readBytes>0){
  108. //將緩衝區當前的limit設定為position=0,用於後續對緩衝區的讀取操作
  109. buffer.flip();
  110. //根據緩衝區可讀位元組數建立位元組陣列
  111. byte[] bytes = new byte[buffer.remaining()];
  112. //將緩衝區可讀位元組陣列複製到新建的陣列中
  113. buffer.get(bytes);
  114. String expression = new String(bytes,"UTF-8");
  115. System.out.println("伺服器收到訊息:" + expression);
  116. //處理資料
  117. String result = null;
  118. try{
  119. result = Calculator.cal(expression).toString();
  120. }catch(Exception e){
  121. result = "計算錯誤:" + e.getMessage();
  122. }
  123. //傳送應答訊息
  124. doWrite(sc,result);
  125. }
  126. //沒有讀取到位元組 忽略
  127. // else if(readBytes==0);
  128. //鏈路已經關閉,釋放資源
  129. else if(readBytes<0){
  130. key.cancel();
  131. sc.close();
  132. }
  133. }
  134. }
  135. }
  136. //非同步傳送應答訊息
  137. private void doWrite(SocketChannel channel,String response) throws IOException{
  138. //將訊息編碼為位元組陣列
  139. byte[] bytes = response.getBytes();
  140. //根據陣列容量建立ByteBuffer
  141. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  142. //將位元組陣列複製到緩衝區
  143. writeBuffer.put(bytes);
  144. //flip操作
  145. writeBuffer.flip();
  146. //傳送緩衝區的位元組陣列
  147. channel.write(writeBuffer);
  148. //****此處不含處理“寫半包”的程式碼
  149. }
  150. }

    可以看到,建立NIO服務端的主要步驟如下:

  1.     開啟ServerSocketChannel,監聽客戶端連線
  2.     繫結監聽埠,設定連線為非阻塞模式
  3.     建立Reactor執行緒,建立多路複用器並啟動執行緒
  4.     將ServerSocketChannel註冊到Reactor執行緒中的Selector上,監聽ACCEPT事件
  5.     Selector輪詢準備就緒的key
  6.     Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路
  7.     設定客戶端鏈路為非阻塞模式
  8.     將新接入的客戶端連線註冊到Reactor執行緒的Selector上,監聽讀操作,讀取客戶端傳送的網路訊息
  9.     非同步讀取客戶端訊息到緩衝區
  10.     對Buffer編解碼,處理半包訊息,將解碼成功的訊息封裝成Task
  11.     將應答訊息編碼為Buffer,呼叫SocketChannel的write將訊息非同步傳送給客戶端

    因為應答訊息的傳送,SocketChannel也是非同步非阻塞的,所以不能保證一次能吧需要傳送的資料傳送完,此時就會出現寫半包的問題。我們需要註冊寫操作,不斷輪詢Selector將沒有傳送完的訊息傳送完畢,然後通過Buffer的hasRemain()方法判斷訊息是否傳送完成。

    2.6、NIO客戶端

    還是直接上程式碼吧,過程也不需要太多解釋了,跟服務端程式碼有點類似:

 
  1. package com.anxpp.calculator.nio;
  2. public class Client {
  3. private static String DEFAULT_HOST = "127.0.0.1";
  4. private static int DEFAULT_PORT = 12345;
  5. private static ClientHandle clientHandle;
  6. public static void start(){
  7. start(DEFAULT_HOST,DEFAULT_PORT);
  8. }
  9. public static synchronized void start(String ip,int port){
  10. if(clientHandle!=null)
  11. clientHandle.stop();
  12. clientHandle = new ClientHandle(ip,port);
  13. new Thread(clientHandle,"Server").start();
  14. }
  15. //向伺服器傳送訊息
  16. public static boolean sendMsg(String msg) throws Exception{
  17. if(msg.equals("q")) return false;
  18. clientHandle.sendMsg(msg);
  19. return true;
  20. }
  21. public static void main(String[] args){
  22. start();
  23. }
  24. }

    ClientHandle:

 
  1. package com.anxpp.calculator.nio;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.SocketChannel;
  8. import java.util.Iterator;
  9. import java.util.Set;
  10. /**
  11. * NIO客戶端
  12. * @author yangtao__anxpp.com
  13. * @version 1.0
  14. */
  15. public class ClientHandle implements Runnable{
  16. private String host;
  17. private int port;
  18. private Selector selector;
  19. private SocketChannel socketChannel;
  20. private volatile boolean started;
  21. public ClientHandle(String ip,int port) {
  22. this.host = ip;
  23. this.port = port;
  24. try{
  25. //建立選擇器
  26. selector = Selector.open();
  27. //開啟監聽通道
  28. socketChannel = SocketChannel.open();
  29. //如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
  30. socketChannel.configureBlocking(false);//開啟非阻塞模式
  31. started = true;
  32. }catch(IOException e){
  33. e.printStackTrace();
  34. System.exit(1);
  35. }
  36. }
  37. @Override
  38. public void run() {
  39. try{
  40. doConnect();
  41. }catch(IOException e){
  42. e.printStackTrace();
  43. System.exit(1);
  44. }
  45. //迴圈遍歷selector
  46. while(started){
  47. try{
  48. //無論是否有讀寫事件發生,selector每隔1s被喚醒一次
  49. selector.select(1000);
  50. //阻塞,只有當至少一個註冊的事件發生的時候才會繼續.
  51. // selector.select();
  52. Set<SelectionKey> keys = selector.selectedKeys();
  53. Iterator<SelectionKey> it = keys.iterator();
  54. SelectionKey key = null;
  55. while(it.hasNext()){
  56. key = it.next();
  57. it.remove();
  58. try{
  59. handleInput(key);
  60. }catch(Exception e){
  61. if(key != null){
  62. key.cancel();
  63. if(key.channel() != null){
  64. key.channel().close();
  65. }
  66. }
  67. }
  68. }
  69. }catch(Exception e){
  70. e.printStackTrace();
  71. System.exit(1);
  72. }
  73. }
  74. //selector關閉後會自動釋放裡面管理的資源
  75. if(selector != null)
  76. try{
  77. selector.close();
  78. }catch (Exception e) {
  79. e.printStackTrace();
  80. }
  81. }
  82. private void handleInput(SelectionKey key) throws IOException{
  83. if(key.isValid()){
  84. SocketChannel sc = (SocketChannel) key.channel();
  85. if(key.isConnectable()){
  86. if(sc.finishConnect());
  87. else System.exit(1);
  88. }
  89. //讀訊息
  90. if(key.isReadable()){
  91. //建立ByteBuffer,並開闢一個1M的緩衝區
  92. ByteBuffer buffer = ByteBuffer.allocate(1024);
  93. //讀取請求碼流,返回讀取到的位元組數
  94. int readBytes = sc.read(buffer);
  95. //讀取到位元組,對位元組進行編解碼
  96. if(readBytes>0){
  97. //將緩衝區當前的limit設定為position=0,用於後續對緩衝區的讀取操作
  98. buffer.flip();
  99. //根據緩衝區可讀位元組數建立位元組陣列
  100. byte[] bytes = new byte[buffer.remaining()];
  101. //將緩衝區可讀位元組陣列複製到新建的陣列中
  102. buffer.get(bytes);
  103. String result = new String(bytes,"UTF-8");
  104. System.out.println("客戶端收到訊息:" + result);
  105. }
  106. //沒有讀取到位元組 忽略
  107. // else if(readBytes==0);
  108. //鏈路已經關閉,釋放資源
  109. else if(readBytes<0){
  110. key.cancel();
  111. sc.close();
  112. }
  113. }
  114. }
  115. }
  116. //非同步傳送訊息
  117. private void doWrite(SocketChannel channel,String request) throws IOException{
  118. //將訊息編碼為位元組陣列
  119. byte[] bytes = request.getBytes();
  120. //根據陣列容量建立ByteBuffer
  121. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  122. //將位元組陣列複製到緩衝區
  123. writeBuffer.put(bytes);
  124. //flip操作
  125. writeBuffer.flip();
  126. //傳送緩衝區的位元組陣列
  127. channel.write(writeBuffer);
  128. //****此處不含處理“寫半包”的程式碼
  129. }
  130. private void doConnect() throws IOException{
  131. if(socketChannel.connect(new InetSocketAddress(host,port)));
  132. else socketChannel.register(selector, SelectionKey.OP_CONNECT);
  133. }
  134. public void sendMsg(String msg) throws Exception{
  135. socketChannel.register(selector, SelectionKey.OP_READ);
  136. doWrite(socketChannel, msg);
  137. }
  138. }

    2.7、演示結果

    首先執行伺服器,順便也執行一個客戶端:

 
  1. /**
  2. * 測試方法
  3. * @author yangtao__anxpp.com
  4. * @version 1.0
  5. */
  6. public class Test {
  7. //測試主方法
  8. @SuppressWarnings("resource")
  9. public static void main(String[] args) throws Exception{
  10. //執行伺服器
  11. Server.start();
  12. //避免客戶端先於伺服器啟動前執行程式碼
  13. Thread.sleep(100);
  14. //執行客戶端
  15. Client.start();
  16. while(Client.sendMsg(new Scanner(System.in).nextLine()));
  17. }
  18. }

    我們也可以單獨執行客戶端,效果都是一樣的。

    一次測試的結果:

 
  1. 伺服器已啟動,埠號:12345
  2. 1+2+3+4+5+6
  3. 伺服器收到訊息:1+2+3+4+5+6
  4. 客戶端收到訊息:21
  5. 1*2/3-4+5*6/7-8
  6. 伺服器收到訊息:1*2/3-4+5*6/7-8
  7. 客戶端收到訊息:-7.0476190476190474

    執行多個客戶端,都是沒有問題的。

 

3、AIO程式設計

    NIO 2.0引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。

    非同步的套接字通道時真正的非同步非阻塞I/O,對應於UNIX網路程式設計中的事件驅動I/O(AIO)。他不需要過多的Selector對註冊的通道進行輪詢即可實現非同步讀寫,從而簡化了NIO的程式設計模型。

    直接上程式碼吧。

    3.1、Server端程式碼

    Server:

 
  1. package com.anxpp.io.calculator.aio.server;
  2. /**
  3. * AIO服務端
  4. * @author yangtao__anxpp.com
  5. * @version 1.0
  6. */
  7. public class Server {
  8. private static int DEFAULT_PORT = 12345;
  9. private static AsyncServerHandler serverHandle;
  10. public volatile static long clientCount = 0;
  11. public static void start(){
  12. start(DEFAULT_PORT);
  13. }
  14. public static synchronized void start(int port){
  15. if(serverHandle!=null)
  16. return;
  17. serverHandle = new AsyncServerHandler(port);
  18. new Thread(serverHandle,"Server").start();
  19. }
  20. public static void main(String[] args){
  21. Server.start();
  22. }
  23. }

    AsyncServerHandler:

 
  1. package com.anxpp.io.calculator.aio.server;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.channels.AsynchronousServerSocketChannel;
  5. import java.util.concurrent.CountDownLatch;
  6. public class AsyncServerHandler implements Runnable {
  7. public CountDownLatch latch;
  8. public AsynchronousServerSocketChannel channel;
  9. public AsyncServerHandler(int port) {
  10. try {
  11. //建立服務端通道
  12. channel = AsynchronousServerSocketChannel.open();
  13. //繫結埠
  14. channel.bind(new InetSocketAddress(port));
  15. System.out.println("伺服器已啟動,埠號:" + port);
  16. } catch (IOException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. @Override
  21. public void run() {
  22. //CountDownLatch初始化
  23. //它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞
  24. //此處,讓現場在此阻塞,防止服務端執行完成後退出
  25. //也可以使用while(true)+sleep
  26. //生成環境就不需要擔心這個問題,以為服務端是不會退出的
  27. latch = new CountDownLatch(1);
  28. //用於接收客戶端的連線
  29. channel.accept(this,new AcceptHandler());
  30. try {
  31. latch.await();
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. }

    AcceptHandler:

 
  1. package com.anxpp.io.calculator.aio.server;
  2. import java.nio.ByteBuffer;
  3. import java.nio.channels.AsynchronousSocketChannel;
  4. import java.nio.channels.CompletionHandler;
  5. //作為handler接收客戶端連線
  6. public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
  7. @Override
  8. public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
  9. //繼續接受其他客戶端的請求
  10. Server.clientCount++;
  11. System.out.println("連線的客戶端數:" + Server.clientCount);
  12. serverHandler.channel.accept(serverHandler, this);
  13. //建立新的Buffer
  14. ByteBuffer buffer = ByteBuffer.allocate(1024);
  15. //非同步讀 第三個引數為接收訊息回撥的業務Handler
  16. channel.read(buffer, buffer, new ReadHandler(channel));
  17. }
  18. @Override
  19. public void failed(Throwable exc, AsyncServerHandler serverHandler) {
  20. exc.printStackTrace();
  21. serverHandler.latch.countDown();
  22. }
  23. }

    ReadHandler:

 
  1. package com.anxpp.io.calculator.aio.server;
  2. import java.io.IOException;
  3. import java.io.UnsupportedEncodingException;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.AsynchronousSocketChannel;
  6. import java.nio.channels.CompletionHandler;
  7. import com.anxpp.io.utils.Calculator;
  8. public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
  9. //用於讀取半包訊息和傳送應答
  10. private AsynchronousSocketChannel channel;
  11. public ReadHandler(AsynchronousSocketChannel channel) {
  12. this.channel = channel;
  13. }
  14. //讀取到訊息後的處理
  15. @Override
  16. public void completed(Integer result, ByteBuffer attachment) {
  17. //flip操作
  18. attachment.flip();
  19. //根據
  20. byte[] message = new byte[attachment.remaining()];
  21. attachment.get(message);
  22. try {
  23. String expression = new String(message, "UTF-8");
  24. System.out.println("伺服器收到訊息: " + expression);
  25. String calrResult = null;
  26. try{
  27. calrResult = Calculator.cal(expression).toString();
  28. }catch(Exception e){
  29. calrResult = "計算錯誤:" + e.getMessage();
  30. }
  31. //向客戶端傳送訊息
  32. doWrite(calrResult);
  33. } catch (UnsupportedEncodingException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. //傳送訊息
  38. private void doWrite(String result) {
  39. byte[] bytes = result.getBytes();
  40. ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  41. writeBuffer.put(bytes);
  42. writeBuffer.flip();
  43. //非同步寫資料 引數與前面的read一樣
  44. channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
  45. @Override
  46. public void completed(Integer result, ByteBuffer buffer) {
  47. //如果沒有傳送完,就繼續傳送直到完成
  48. if (buffer.hasRemaining())
  49. channel.write(buffer, buffer, this);
  50. else{
  51. //建立新的Buffer
  52. ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  53. //非同步讀 第三個引數為接收訊息回撥的業務Handler
  54. channel.read(readBuffer, readBuffer, new ReadHandler(channel));
  55. }
  56. }
  57. @Override
  58. public void failed(Throwable exc, ByteBuffer attachment) {
  59. try {
  60. channel.close();
  61. } catch (IOException e) {
  62. }
  63. }
  64. });
  65. }
  66. @Override
  67. public void failed(Throwable exc, ByteBuffer attachment) {
  68. try {
  69. this.channel.close();
  70. } catch (IOException e) {
  71. e.printStackTrace();
  72. }
  73. }
  74. }

    OK,這樣就已經完成了,其實說起來也簡單,我感覺API比NIO的使用起來真的簡單多了,主要就是監聽、讀、寫等各種CompletionHandler。此處本應有一個WriteHandler的,確實,我們在ReadHandler中,以一個匿名內部類實現了它。

    下面看客戶端程式碼。

    3.2、Client端程式碼

    Client:

 
  1. package com.anxpp.io.calculator.aio.client;
  2. import java.util.Scanner;
  3. public class Client {
  4. private static String DEFAULT_HOST = "127.0.0.1";
  5. private static int DEFAULT_PORT = 12345;
  6. private static AsyncClientHandler clientHandle;
  7. public static void start(){
  8. start(DEFAULT_HOST,DEFAULT_PORT);
  9. }
  10. public static synchronized void start(String ip,int port){
  11. if(clientHandle!=null)
  12. return;
  13. clientHandle = new AsyncClientHandler(ip,port);
  14. new Thread(clientHandle,"Client").start();
  15. }
  16. //向伺服器傳送訊息
  17. public static boolean sendMsg(String msg) throws Exception{
  18. if(msg.equals("q")) return false;
  19. clientHandle.sendMsg(msg);
  20. return true;
  21. }
  22. @SuppressWarnings("resource")
  23. public static void main(String[] args) throws Exception{
  24. Client.start();
  25. System.out.println("請輸入請求訊息:");
  26. Scanner scanner = new Scanner(System.in);
  27. while(Client.sendMsg(scanner.nextLine()));
  28. }
  29. }

    AsyncClientHandler:

 
  1. package com.anxpp.io.calculator.aio.client;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.AsynchronousSocketChannel;
  6. import java.nio.channels.CompletionHandler;
  7. import java.util.concurrent.CountDownLatch;
  8. public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
  9. private AsynchronousSocketChannel clientChannel;
  10. private String host;
  11. private int port;
  12. private CountDownLatch latch;
  13. public AsyncClientHandler(String host, int port) {
  14. this.host = host;
  15. this.port = port;
  16. try {
  17. //建立非同步的客戶端通道
  18. clientChannel = AsynchronousSocketChannel.open();
  19. } catch (IOException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. @Override
  24. public void run() {
  25. //建立CountDownLatch等待
  26. latch = new CountDownLatch(1);
  27. //發起非同步連線操作,回撥引數就是這個類本身,如果連線成功會回撥completed方法
  28. clientChannel.connect(new InetSocketAddress(host, port), this, this);
  29. try {
  30. latch.await();
  31. } catch (InterruptedException e1) {
  32. e1.printStackTrace();
  33. }
  34. try {
  35. clientChannel.close();
  36. } catch (IOException e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. //連線伺服器成功
  41. //意味著TCP三次握手完成
  42. @Override
  43. public void completed(Void result, AsyncClientHandler attachment) {
  44. System.out.println("客戶端成功連線到伺服器...");
  45. }
  46. //連線伺服器失敗
  47. @Override
  48. public void failed(Throwable exc, AsyncClientHandler attachment) {
  49. System.err.println("連線伺服器失敗...");
  50. exc.printStackTrace();
  51. try {
  52. clientChannel.close();
  53. latch.countDown();
  54. } catch (IOException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. //向伺服器傳送訊息
  59. public void sendMsg(String msg){
  60. byte[] req = msg.getBytes();
  61. ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
  62. writeBuffer.put(req);
  63. writeBuffer.flip();
  64. //非同步寫
  65. clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
  66. }
  67. }

    WriteHandler:

 
  1. package com.anxpp.io.calculator.aio.client;
  2. import java.io.IOException;
  3. import java.nio.ByteBuffer;
  4. import java.nio.channels.AsynchronousSocketChannel;
  5. import java.nio.channels.CompletionHandler;
  6. import java.util.concurrent.CountDownLatch;
  7. public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
  8. private AsynchronousSocketChannel clientChannel;
  9. private CountDownLatch latch;
  10. public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
  11. this.clientChannel = clientChannel;
  12. this.latch = latch;
  13. }
  14. @Override
  15. public void completed(Integer result, ByteBuffer buffer) {
  16. //完成全部資料的寫入
  17. if (buffer.hasRemaining()) {
  18. clientChannel.write(buffer, buffer, this);
  19. }
  20. else {
  21. //讀取資料
  22. ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  23. clientChannel.read(readBuffer,readBuffer,new ReadHandler(clientChannel, latch));
  24. }
  25. }
  26. @Override
  27. public void failed(Throwable exc, ByteBuffer attachment) {
  28. System.err.println("資料傳送失敗...");
  29. try {
  30. clientChannel.close();
  31. latch.countDown();
  32. } catch (IOException e) {
  33. }
  34. }
  35. }

    ReadHandler:

 
  1. package com.anxpp.io.calculator.aio.client;
  2. import java.io.IOException;
  3. import java.io.UnsupportedEncodingException;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.AsynchronousSocketChannel;
  6. import java.nio.channels.CompletionHandler;
  7. import java.util.concurrent.CountDownLatch;
  8. public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
  9. private AsynchronousSocketChannel clientChannel;
  10. private CountDownLatch latch;
  11. public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
  12. this.clientChannel = clientChannel;
  13. this.latch = latch;
  14. }
  15. @Override
  16. public void completed(Integer result,ByteBuffer buffer) {
  17. buffer.flip();
  18. byte[] bytes = new byte[buffer.remaining()];
  19. buffer.get(bytes);
  20. String body;
  21. try {
  22. body = new String(bytes,"UTF-8");
  23. System.out.println("客戶端收到結果:"+ body);
  24. } catch (UnsupportedEncodingException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. @Override
  29. public void failed(Throwable exc,ByteBuffer attachment) {
  30. System.err.println("資料讀取失敗...");
  31. try {
  32. clientChannel.close();
  33. latch.countDown();
  34. } catch (IOException e) {
  35. }
  36. }
  37. }

    這個API使用起來真的是很順手。

    3.3、測試

    Test:

 
  1. package com.anxpp.io.calculator.aio;
  2. import java.util.Scanner;
  3. import com.anxpp.io.calculator.aio.client.Client;
  4. import com.anxpp.io.calculator.aio.server.Server;
  5. /**
  6. * 測試方法
  7. * @author yangtao__anxpp.com
  8. * @version 1.0
  9. */
  10. public class Test {
  11. //測試主方法
  12. @SuppressWarnings("resource")
  13. public static void main(String[] args) throws Exception{
  14. //執行伺服器
  15. Server.start();
  16. //避免客戶端先於伺服器啟動前執行程式碼
  17. Thread.sleep(100);
  18. //執行客戶端
  19. Client.start();
  20. System.out.println("請輸入請求訊息:");
  21. Scanner scanner = new Scanner(System.in);
  22. while(Client.sendMsg(scanner.nextLine()));
  23. }
  24. }

    我們可以在控制檯輸入我們需要計算的算數字符串,伺服器就會返回結果,當然,我們也可以執行大量的客戶端,都是沒有問題的,以為此處設計為單例客戶端,所以也就沒有演示大量客戶端併發。

    讀者可以自己修改Client類,然後開闢大量執行緒,並使用構造方法建立很多的客戶端測試。

    下面是其中一次引數的輸出:

 
  1. 伺服器已啟動,埠號:12345
  2. 請輸入請求訊息:
  3. 客戶端成功連線到伺服器...
  4. 連線的客戶端數:1
  5. 123456+789+456
  6. 伺服器收到訊息: 123456+789+456
  7. 客戶端收到結果:124701
  8. 9526*56
  9. 伺服器收到訊息: 9526*56
  10. 客戶端收到結果:533456
  11. ...

    AIO是真正的非同步非阻塞的,所以,在面對超級大量的客戶端,更能得心應手。

    下面就比較一下,幾種I/O程式設計的優缺點。

 

4、各種I/O的對比

    先以一張表來直觀的對比一下:

    03

    具體選擇什麼樣的模型或者NIO框架,完全基於業務的實際應用場景和效能需求,如果客戶端很少,伺服器負荷不重,就沒有必要選擇開發起來相對不那麼簡單的NIO做服務端;相反,就應考慮使用NIO或者相關的框架了。

 

5、附錄

    上文中服務端使用到的用於計算的工具類:

 
  1. package com.anxpp.utils;
  2. import javax.script.ScriptEngine;
  3. import javax.script.ScriptEngineManager;
  4. import javax.script.ScriptException;
  5. public final class Calculator {
  6. private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
  7. public static Object cal(String expression) throws ScriptException{
  8. return jse.eval(expression);
  9. }
  10. }