1. 程式人生 > 實用技巧 >Java1.4從BIO模型發展到NIO模型

Java1.4從BIO模型發展到NIO模型

簡介

前面幾章我們已經學習了 NIO 的核心元件,再次不多贅述。

BIO 模型


為什麼需要把 Acceptor 和業務處理放到不同的執行緒中?

主要原因是套接字的掛起連線數的數量是有限的。以下是 ServerSocketChannel 的 API

API:java.nio.channels 1.4

  • ServerSocketChannel bind(SocketAddress local, int backlog)
    繫結套接字管道到指定的地址,並設定套接字上的最大掛起連線數 backlog
while (true) {
      channel.accept();
      Thread.sleep(2000); // 這2秒種示意讀寫阻塞或者業務處理時長
}

這樣就會大大增加被服務端拒絕連線的客戶端數量,原理和 Socket 是相通的,詳細的,可以參看這篇文章 淺談 Java Socket 建構函式引數 backlog
服務端接收到套接字管道物件之後,交給新建的執行緒去進行處理,這就是典型的 BIO 模型,接下來我將 “換湯不換藥” 的用 java.nio.channels 中的元件來實現 BIO 模型中的服務端。

Java ServerSocketChannel 服務端

public class TcpServer {

    public static void main(String[] args) {
        try {
            // 1. 開啟管道
            ServerSocketChannel channel = ServerSocketChannel.open();
            // 2. 繫結埠
            channel.bind(new InetSocketAddress(8081));
            while (true) {
                // 3. 獲取套接字管道
                SocketChannel socketChannel = channel.accept();
                System.out.println("建立一個新的連線");
                // BIO 模型,交給執行緒去處理
                new Thread(new SocketChannelHandler(socketChannel)).start();
            }
        } catch (IOException ex) {
            System.out.println("TcpServer " + ex);
        }
    }
}

另外一個類

public class SocketChannelHandler implements Runnable {
    private SocketChannel channel;

    public SocketChannelHandler(SocketChannel channel) {
        this.channel = channel;
    }

    @Override
    public void run() {
        try {
            // 分配緩衝區
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            while (true) {
                //  坑:讀取前先清空緩衝區,不然重複讀取到相同的內容
                buffer.clear();
                // 讀取資料
                channel.read(buffer);
                System.out.print("Reading ");
                // 切換到讀模式
                buffer.flip();
                // 申請 JVM 位元組陣列
                byte[] data = new byte[buffer.remaining()];
                // 從緩衝區讀取到陣列中
                buffer.get(data);
                String message = new String(data);
                if (message.equals("Exit")) {
                    channel.close();
                    // 坑:如果不退出迴圈,下次 read 時會報錯
                    break;
                } else {
                    System.out.println(new String(data));
                }
            }
        } catch (IOException e) {
            System.out.println("SocketChannelHandler run:" + e);
        }
    }
}

讀取前先清空緩衝區

如果我們註釋掉 buffer.clear(),那麼接下來每次讀取到的資料都是第一次接收到的資料。
比如,我們使用 telnet 127.0.0.1 8081 開啟 Windows Telnet 客戶端,然後按 Ctrl + ] ,接著輸入 send 1 按下回車。

這樣,我們通過 Telnet 客戶端給我們的 Java 服務端傳送了一條 TCP 訊息。

再迴圈之後,channel.read(buffer) 直接返回 0,因為此時 buffer.remaining() == 0,控制檯一直輸出1!

所以我們需要在呼叫 int read(ByteBuffer buffer) 之前,先呼叫 Buffer clear() 清空緩衝區 Buffer,保證 TCP 資料的順利寫入。

關閉管道後退出迴圈

這個比較好理解,呼叫了 SocketChannel#close() 方法之後, SocketChannel.isOpen() 會返回 false,表示當前套接字管道已經關閉了。
此時,如果還去呼叫讀寫方法,例如 SocketChannel#read(ByteBuffer buffer),會丟擲 java.nio.channels.ClosedChannelException 異常。

NIO 模型

上一節,我們用管道實現了一個 BIO 阻塞式模型,在 BIO 模型中,服務端建立連線後就會立馬分配一個執行緒等待訊息到達。由於不知道什麼時候訊息能到達客戶端,所以主要一直阻塞等待。
能否等訊息到達之後在分配執行緒進行處理?這就需要 Selector 出場了。只要將管道設定為非阻塞模式,然後註冊至 Selector 。當訊息到達後就會得到通知。

API:java.nio.channels.SelectableChannel 1.4

  • SelectableChannel configureBlocking(boolean block)
    設定管道的阻塞模式。

1.初始化管道

初始化管道又分為 建立一個空的服務端套接字管道,繫結套接字埠,設定非阻塞模式,註冊事件

// 建立一個空的套接字管道
ServerSocketChannel channel = ServerSocketChannel.open();
// 坑:如果不設定非阻塞,還是阻塞式模型
channel.configureBlocking(false);
// 繫結埠
channel.bind(new InetSocketAddress(8081));

Selector selector = Selector.open();
// 註冊事件,監聽建立連線事件
channel.register(selector, SelectionKey.OP_ACCEPT);

SelectableChannel 註冊事件到 Selector 上,本質上是要建立兩者之間的一對一關係。我們可以看一下下面的 UML 圖:

為什麼是註冊 OP_ACCEPT 而不是其他事件呢?
因為 ServerSocketChannel 只能註冊 OP_ACCEPT 事件,其他事件都不會註冊成功。詳細請看 NIO入門之多路複用選擇器Selector

注意

  • 如果我們不設定非阻塞,即呼叫 channel.configureBlocking(false);,那麼註冊事件將會失敗。即下圖所示的 IllegalBlockingModeException 異常

2.遍歷鍵集

while (true) {
      int count = selector.select();
      if (count == 0) continue;
      Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
      while (iterator.hasNext()) {
            SelectionKey selectionKey = iterator.next();
            if (selectionKey.isAcceptable()) {
                  handleAccept(selectionKey);
            } else if (selectionKey.isReadable()) {
                  handleRead(selectionKey);
            } else if (selectionKey.isWritable()) {
                  handleWrite(selectionKey);
            }
            // 坑,如果不移除,同一個事件會重複處理
            iterator.remove();
      }
}

3.處理 OP_ACCEPT 接受連線,註冊管道

ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = server.accept();
// 坑:雖然設定好了 ServerSocketChannel 是非阻塞的,但是還是需要設定 SocketChannel 也是非阻塞的
socketChannel.configureBlocking(false);
SelectionKey read = socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
System.out.println(read.equals(selectionKey)); // 輸出 false

這裡也需要 設定非阻塞,否則註冊時也會丟擲 IllegalBlockingModeException 異常。
需要使用 Selector 註冊每個新建立的套接字管道 SocketChannel。
一般來說,伺服器都是響應客戶端的請求的,所以會註冊 OP_READ 事件。如果註冊 OP_WRITE 事件,寫事件不會阻塞,會立即觸發。

4.處理 OP_READ 事件,讀取資料

private void handleRead(SelectionKey selectionKey) throws IOException {
      ByteBuffer buffer = ByteBuffer.allocate(1024);
      SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
      buffer.clear();
      socketChannel.read(buffer);
      buffer.flip();
      byte[] data = new byte[buffer.remaining()];
      buffer.get(data);
      System.out.print("Reading ");
      System.out.println(new String(data));
      SelectionKey write = socketChannel.register(selector, SelectionKey.OP_WRITE);
//      SelectionKey write = selectionKey.interestOps(SelectionKey.OP_WRITE);
      System.out.println(write.equals(selectionKey));
}
  • 呼叫 register 和 interestOps 這裡效果是一樣的。

    因為 handleAccept 的時候,就已經註冊好 SocketChannel 和 Selector 的一對一關係了。所以就算是呼叫 register 其實也就是呼叫 interestOps

5.處理 OP_WRITE 事件

private void handleWrite(SelectionKey selectionKey) throws IOException {
      System.out.println("Writing...");
      SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
      socketChannel.write(ByteBuffer.wrap("heartbeat\r\n".getBytes()));
      selectionKey.interestOps(SelectionKey.OP_READ);
}

總結

java 1.4 引入 java.nio 包,使用這些 API 可以實現一個 BIO 模型,也可以實現 NIO 模型。
BIO模型在客戶端與服務端建立連線之後,服務端就會立即分配一個執行緒,但是服務端又需要阻塞執行緒來等待讀取客戶端傳送資料。
這樣就需要不斷建立新的執行緒應對不斷增加的服務端請求,而建立執行緒是需要消耗伺服器效能的。那麼可不可以等客戶端資料到達後再分配執行緒進行處理呢?
Selector 以及基於事件處理的 NIO 模型“應運而生”。

參考原始碼

服務端

public class TcpServer {

    public static void main(String[] args) {
        try {
            ServerSocketChannel channel = ServerSocketChannel.open();
            // 坑:如果不設定非阻塞,還是阻塞式模型
            channel.configureBlocking(false);
            channel.bind(new InetSocketAddress(8081));

            Selector selector = Selector.open();
            channel.register(selector, SelectionKey.OP_ACCEPT);

            new Thread(new SelectorIO(selector), "Selector-IO").start();
            System.in.read(); // 阻塞主執行緒
        } catch (IOException ex) {
            System.out.println("TcpServer " + ex);
        }
    }
}

IO 執行緒

public class SelectorIO implements Runnable {

    private Selector selector;

    public SelectorIO(Selector selector) {
        this.selector = selector;
    }

    @Override
    public void run() {
        try {
            while (true) {
                int count = selector.select();
                if (count == 0) continue;
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    if (selectionKey.isAcceptable()) {
                        handleAccept(selectionKey);
                    } else if (selectionKey.isReadable()) {
                        handleRead(selectionKey);
                    } else if (selectionKey.isWritable()) {
                        handleWrite(selectionKey);
                    }
                    // 坑,如果不移除,同一個事件會重複處理
                    iterator.remove();
                }
            }
        } catch (Exception e) {
            System.out.println("SelectorIO run error." + e);
            e.printStackTrace();
        }
    }

    private void handleWrite(SelectionKey selectionKey) throws IOException {
        System.out.println("Writing...");
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        socketChannel.write(ByteBuffer.wrap("heartbeat\r\n".getBytes()));
        selectionKey.interestOps(SelectionKey.OP_READ);
    }

    private void handleRead(SelectionKey selectionKey) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        buffer.clear();
        socketChannel.read(buffer);
        buffer.flip();
        byte[] data = new byte[buffer.remaining()];
        buffer.get(data);
        System.out.print("Reading ");
        System.out.println(new String(data));
        SelectionKey write = socketChannel.register(selector, SelectionKey.OP_WRITE);
//        SelectionKey write = selectionKey.interestOps(SelectionKey.OP_WRITE);
        System.out.println(write.equals(selectionKey));
    }

    private void handleAccept(SelectionKey selectionKey) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
        SocketChannel socketChannel = server.accept();
        // 坑:雖然設定好了 ServerSocketChannel 是非阻塞的,但是還是需要設定 SocketChannel 也是非阻塞的
        socketChannel.configureBlocking(false);
        SelectionKey read = socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
        System.out.println(read.equals(selectionKey));
    }
}