1. 程式人生 > 程式設計 >Java BIO,NIO,AIO總結

Java BIO,NIO,AIO總結

Java 中的 BIO、NIO和 AIO 理解為是 Java 語言對作業系統的各種 IO 模型的封裝。程式設計師在使用這些 API 的時候,不需要關心作業系統層面的知識,也不需要根據不同作業系統編寫不同的程式碼。只需要使用Java的API就可以了。

在講 BIO,NIO,AIO 之前先來回顧一下這樣幾個概念:同步與非同步,阻塞與非阻塞。 同步與非同步

  • 同步: 同步就是發起一個呼叫後,被呼叫者未處理完請求之前,呼叫不返回。
  • 非同步: 非同步就是發起一個呼叫後,立刻得到被呼叫者的迴應表示已接收到請求,但是被呼叫者並沒有返回結果,此時我們可以處理其他的請求,被呼叫者通常依靠事件,回撥等機制來通知呼叫者其返回結果。 同步和非同步的區別最大在於非同步的話呼叫者不需要等待處理結果,被呼叫者會通過回撥等機制來通知呼叫者其返回結果。

阻塞和非阻塞

  • 阻塞: 阻塞就是發起一個請求,呼叫者一直等待請求結果返回,也就是當前執行緒會被掛起,無法從事其他任務,只有當條件就緒才能繼續。
  • 非阻塞: 非阻塞就是發起一個請求,呼叫者不用一直等著結果返回,可以先去幹其他事情。 舉個生活中簡單的例子,你媽媽讓你燒水,小時候你比較笨啊,在那裡傻等著水開(同步阻塞)。等你稍微再長大一點,你知道每次燒水的空隙可以去幹點其他事,然後只需要時不時來看看水開了沒有(同步非阻塞)。後來,你們家用上了水開了會發出聲音的壺,這樣你就只需要聽到響聲後就知道水開了,在這期間你可以隨便幹自己的事情,你需要去倒水了(非同步非阻塞)。

BIO (Blocking I/O)

同步阻塞I/O模式,資料的讀取寫入必須阻塞在一個執行緒內等待其完成。

傳統 BIO

BIO通訊(一請求一應答)模型圖如下(圖源網路,原出處不明):

Java BIO,AIO總結

採用 BIO 通訊模型 的服務端,通常由一個獨立的 Acceptor 執行緒負責監聽客戶端的連線。我們一般通過在while(true) 迴圈中服務端會呼叫 accept() 方法等待接收客戶端的連線的方式監聽請求,請求一旦接收到一個連線請求,就可以建立通訊套接字在這個通訊套接字上進行讀寫操作,此時不能再接收其他客戶端連線請求,只能等待同當前連線的客戶端的操作執行完成, 不過可以通過多執行緒來支援多個客戶端的連線,如上圖所示。

如果要讓 BIO 通訊模型 能夠同時處理多個客戶端請求,就必須使用多執行緒(主要原因是socket.accept()、socket.read()、socket.write() 涉及的三個主要函式都是同步阻塞的),也就是說它在接收到客戶端連線請求之後為每個客戶端建立一個新的執行緒進行鏈路處理,處理完成之後,通過輸出流返回應答給客戶端,執行緒銷燬。這就是典型的 一請求一應答通訊模型 。我們可以設想一下如果這個連線不做任何事情的話就會造成不必要的執行緒開銷,不過可以通過 執行緒池機制 改善,執行緒池還可以讓執行緒的建立和回收成本相對較低。使用FixedThreadPool 可以有效的控制了執行緒的最大數量,保證了系統有限的資源的控制,實現了N(客戶端請求數量):M(處理客戶端請求的執行緒數量)的偽非同步I/O模型(N 可以遠遠大於 M),下面一節"偽非同步 BIO"中會詳細介紹到。

我們再設想一下當客戶端併發訪問量增加後這種模型會出現什麼問題?

在 Java 虛擬機器中,執行緒是寶貴的資源,執行緒的建立和銷燬成本很高,除此之外,執行緒的切換成本也是很高的。尤其在 Linux 這樣的作業系統中,執行緒本質上就是一個程序,建立和銷燬執行緒都是重量級的系統函式。如果併發訪問量增加會導致執行緒數急劇膨脹可能會導致執行緒堆疊溢位、建立新執行緒失敗等問題,最終導致程序宕機或者僵死,不能對外提供服務。

偽非同步 IO

為了解決同步阻塞I/O面臨的一個鏈路需要一個執行緒處理的問題,後來有人對它的執行緒模型進行了優化一一一後端通過一個執行緒池來處理多個客戶端的請求接入,形成客戶端個數M:執行緒池最大執行緒數N的比例關係,其中M可以遠遠大於N.通過執行緒池可以靈活地調配執行緒資源,設定執行緒的最大值,防止由於海量併發接入導致執行緒耗盡。

偽非同步IO模型圖(圖源網路,原出處不明):

Java BIO,AIO總結

採用執行緒池和任務佇列可以實現一種叫做偽非同步的 I/O 通訊框架,它的模型圖如上圖所示。當有新的客戶端接入時,將客戶端的 Socket 封裝成一個Task(該任務實現java.lang.Runnable介面)投遞到後端的執行緒池中進行處理,JDK 的執行緒池維護一個訊息佇列和 N 個活躍執行緒,對訊息佇列中的任務進行處理。由於執行緒池可以設定訊息佇列的大小和最大執行緒數,因此,它的資源佔用是可控的,無論多少個客戶端併發訪問,都不會導致資源的耗盡和宕機。

偽非同步I/O通訊框架採用了執行緒池實現,因此避免了為每個請求都建立一個獨立執行緒造成的執行緒資源耗盡問題。不過因為它的底層仍然是同步阻塞的BIO模型,因此無法從根本上解決問題。

程式碼示例

下面程式碼中演示了BIO通訊(一請求一應答)模型。我們會在客戶端建立多個執行緒依次連線服務端並向其傳送"當前時間+:hello world",服務端會為每個客戶端執行緒建立一個執行緒來處理。程式碼示例出自閃電俠的部落格,原地址如下:

客戶端

/**
 * 
 * @author 閃電俠
 * @date 2018年10月14日
 * @Description:客戶端
 */
public class IOClient {

 public static void main(String[] args) {
  // TODO 建立多個執行緒,模擬多個客戶端連線服務端
  new Thread(() -> {
   try {
    Socket socket = new Socket("127.0.0.1",3333);
    while (true) {
     try {
      socket.getOutputStream().write((new Date() + ": hello world").getBytes());
      Thread.sleep(2000);
     } catch (Exception e) {
     }
    }
   } catch (IOException e) {
   }
  }).start();

 }

}

服務端

/**
 * @author 閃電俠
 * @date 2018年10月14日
 * @Description: 服務端
 */
public class IOServer {

 public static void main(String[] args) throws IOException {
  // TODO 服務端處理客戶端連線請求
  ServerSocket serverSocket = new ServerSocket(3333);

  // 接收到客戶端連線請求之後為每個客戶端建立一個新的執行緒進行鏈路處理
  new Thread(() -> {
   while (true) {
    try {
     // 阻塞方法獲取新的連線
     Socket socket = serverSocket.accept();

     // 每一個新的連線都建立一個執行緒,負責讀取資料
     new Thread(() -> {
      try {
       int len;
       byte[] data = new byte[1024];
       InputStream inputStream = socket.getInputStream();
       // 按位元組流方式讀取資料
       while ((len = inputStream.read(data)) != -1) {
        System.out.println(new String(data,len));
       }
      } catch (IOException e) {
      }
     }).start();

    } catch (IOException e) {
    }

   }
  }).start();

 }

}

總結

在活動連線數不是特別高(小於單機1000)的情況下,這種模型是比較不錯的,可以讓每一個連線專注於自己的 I/O 並且程式設計模型簡單,也不用過多考慮系統的過載、限流等問題。執行緒池本身就是一個天然的漏斗,可以緩衝一些系統處理不了的連線或請求。但是,當面對十萬甚至百萬級連線的時候,傳統的 BIO 模型是無能為力的。因此,我們需要一種更高效的 I/O 處理模型來應對更高的併發量。

NIO (no blocking io 也叫 new io)

NIO 即非阻塞IO,是JDK 1.4 更新的api, 核心內容是 將建立連線、資料可讀、可寫等事件交給了作業系統來維護, 通過呼叫作業系統的 api (如:select、epoll等),來判斷當前是否支援:可讀、可寫,如果當前不可操作,那麼直接返回,從而實現了非阻塞。 而不需要像 BIO 那樣每次去輪詢等待連線的建立以及資料的準備是否完成。主要核心的模組分以下幾類:

1. 緩衝區Buffer

一個特定基類(byte、short、int、long 等)的資料容器,用作在建立socket 連線之後的資料傳輸。
通過 capacity,limit,position,mark 指標來實現資料的讀寫

get()、put() 方法為每個子類都具有的讀、寫資料的api方法,當從當前的 position 讀或寫的同時,position會增加 相應讀寫的資料的長度。當position 達到limit 之後,再次 get、put則會丟擲異常

2. Channel 連線通道

一個 channel 代表一個與“實體”的連線通道,如:硬體裝置、檔案、網路 socket 。通過連線通道可以使得客戶端-伺服器互相傳輸資料,因此通道也是全雙工的(因為是建立在TCP 傳輸層的協議上,因此具備全雙工的能力)。

JDK 中 channel 可以分為以下幾類:

SelectableChannel 用於 阻塞和非阻塞 socket 連線的通道
FileChannel 用於檔案操作,包括:reading,writing,mapping,and manipulating a file

3.Selector 多路複用選擇器

用於 SelectableChannel 的多路複用器,當使用非阻塞的 socket 時,需要將監聽的通道 SelectableChannel 感興趣的事件註冊到 selector 多路複用器上(selector 實際上是通過呼叫作業系統層面的 select、epoll 方法來獲取當前可用的時間)

與之對應的感興趣的事件用 SelectionKey 來表示

  • OP_READ = 1 << 0; 可讀
  • OP_WRITE = 1 << 2; 可寫
  • OP_CONNECT = 1 << 3; // 完成連線
  • OP_ACCEPT = 1 << 4; // 接收連線

處理流程圖:

Java BIO,AIO總結

程式碼示例:

  1. 通過 ServerSocketChannel 監聽 8082 埠
  2. 設定為非阻塞
  3. 選擇與作業系統適配的選擇器,serverSocketChannel 的 OP_ACCEPT 事件註冊到 selector 選擇器上
  4. 當OP_ACCEPT 事件觸發時,將所有建立好的Socketchannel 連線的感興趣的事件(這裡為 read事件)再次註冊到Selector 上
    // 1.根據作業系統選擇適當的底層 io複用方法
    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    serverSocketChannel.bind(new InetSocketAddress(8082));
    //2.設定為非阻塞
    serverSocketChannel.configureBlocking(false);
    //3.選擇與作業系統適配的選擇器
    Selector selector = Selector.open();
    //將 serverSocket 的OP_ACCEPT 事件註冊到 selector 選擇器上
    serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
    while (true) {
      // 4.監聽當前連線建立情況
      int select = selector.select();
      if (select > 0) {
        //判斷連線業務型別
        Set<SelectionKey> set = selector.selectedKeys();
        Iterator<SelectionKey> iterator = set.iterator();
        while (iterator.hasNext()) {
          SelectionKey key = iterator.next();
          iterator.remove();
          //建立連線
          if (key.isAcceptable()) {
            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            //通過 accept 方法獲取與 server端 已經建立好的 socket連線
            SocketChannel sc = ssc.accept();
            //設定為非阻塞
            sc.configureBlocking(false);
            //註冊感興趣的事件為 READ
            sc.register(selector,SelectionKey.OP_READ);
          }
          //可讀
          else if (key.isReadable()) {
            SocketChannel socket = (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            socket.read(byteBuffer);
            System.out.println(new String(byteBuffer.array(),StandardCharsets.UTF_8));
            key.interestOps(SelectionKey.OP_WRITE);
          }
          //可寫
          else if (key.isWritable()) {
            SocketChannel socket = (SocketChannel) key.channel();
            socket.write(ByteBuffer.wrap("I'm receive your message".getBytes(StandardCharsets.UTF_8)));
            socket.close();
            System.out.println("連線關閉成功!");
          }
        }
      }
    }

AIO(asynchronous io)

NIO 2.0引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。
非同步的套接字通道時真正的非同步非阻塞I/O,對應於UNIX網路程式設計中的事件驅動I/O(AIO)。他不需要過多的Selector對註冊的通道進行輪詢即可實現非同步讀寫,從而簡化了NIO的程式設計模型。

程式碼示例

  private static void server() throws IOException {
    //根據作業系統建立對應的底層操作類
    AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
    channel.bind(new InetSocketAddress(8082));
    while (true) {
      Future<AsynchronousSocketChannel> future = channel.accept();
      try {
        AsynchronousSocketChannel asc = future.get();
        System.out.println("建立連線成功");
        Future<Integer> write = asc.write(ByteBuffer.wrap("Now let's exchange datas".getBytes(StandardCharsets.UTF_8)));
        while (!write.isDone()) {
          TimeUnit.SECONDS.sleep(2);
        }
        System.out.println("傳送資料完成");
        asc.close();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }

  private static void client() throws Exception {
    AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
    Future<Void> future = socketChannel.connect(new InetSocketAddress(8082));
    while (!future.isDone()) {
      TimeUnit.SECONDS.sleep(2);
    }
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    Future<Integer> read = socketChannel.read(buffer);
    while (!read.isDone()) {
      TimeUnit.SECONDS.sleep(2);
    }
    System.out.println("接收伺服器資料:" + new String(buffer.array(),read.get()));
  }

以上就是Java BIO,AIO總結的詳細內容,更多關於Java BIO,AIO的資料請關注我們其它相關文章!