Java1.4從BIO模型發展到NIO模型
簡介
前面幾章我們已經學習了 NIO 的核心元件,再次不多贅述。
- NIO 緩衝區 Buffer
- 多路複用選擇器 Selector
本章我們就將使用 NIO 的核心元件來實現 TCP 的客戶端和服務端。
BIO 模型
為什麼需要把 Acceptor 和業務處理放到不同的執行緒中?
主要原因是套接字的掛起連線數的數量是有限的。以下是 ServerSocketChannel 的 API
API:java.nio.channels 1.4
- ServerSocketChannel bind(SocketAddress local, int backlog)
繫結套接字管道到指定的地址,並設定套接字上的最大掛起連線數 backlog
while (true) {
channel.accept();
Thread.sleep(2000); // 這2秒種示意讀寫阻塞或者業務處理時長
}
這樣就會大大增加被服務端拒絕連線的客戶端數量,原理和 Socket 是相通的,詳細的,可以參看這篇文章 淺談 Java Socket 建構函式引數 backlog
服務端接收到套接字管道物件之後,交給新建的執行緒去進行處理,這就是典型的 BIO 模型,接下來我將 “換湯不換藥” 的用 java.nio.channels
中的元件來實現 BIO 模型中的服務端。
Java ServerSocketChannel 服務端
public class TcpServer { public static void main(String[] args) { try { // 1. 開啟管道 ServerSocketChannel channel = ServerSocketChannel.open(); // 2. 繫結埠 channel.bind(new InetSocketAddress(8081)); while (true) { // 3. 獲取套接字管道 SocketChannel socketChannel = channel.accept(); System.out.println("建立一個新的連線"); // BIO 模型,交給執行緒去處理 new Thread(new SocketChannelHandler(socketChannel)).start(); } } catch (IOException ex) { System.out.println("TcpServer " + ex); } } }
另外一個類
public class SocketChannelHandler implements Runnable { private SocketChannel channel; public SocketChannelHandler(SocketChannel channel) { this.channel = channel; } @Override public void run() { try { // 分配緩衝區 ByteBuffer buffer = ByteBuffer.allocate(1024); while (true) { // 坑:讀取前先清空緩衝區,不然重複讀取到相同的內容 buffer.clear(); // 讀取資料 channel.read(buffer); System.out.print("Reading "); // 切換到讀模式 buffer.flip(); // 申請 JVM 位元組陣列 byte[] data = new byte[buffer.remaining()]; // 從緩衝區讀取到陣列中 buffer.get(data); String message = new String(data); if (message.equals("Exit")) { channel.close(); // 坑:如果不退出迴圈,下次 read 時會報錯 break; } else { System.out.println(new String(data)); } } } catch (IOException e) { System.out.println("SocketChannelHandler run:" + e); } } }
讀取前先清空緩衝區
如果我們註釋掉 buffer.clear()
,那麼接下來每次讀取到的資料都是第一次接收到的資料。
比如,我們使用 telnet 127.0.0.1 8081
開啟 Windows Telnet 客戶端,然後按 Ctrl
+ ]
,接著輸入 send 1
按下回車。
這樣,我們通過 Telnet 客戶端給我們的 Java 服務端傳送了一條 TCP 訊息。
再迴圈之後,channel.read(buffer) 直接返回 0,因為此時 buffer.remaining() == 0,控制檯一直輸出1!
所以我們需要在呼叫 int read(ByteBuffer buffer)
之前,先呼叫 Buffer clear()
清空緩衝區 Buffer,保證 TCP 資料的順利寫入。
關閉管道後退出迴圈
這個比較好理解,呼叫了 SocketChannel#close()
方法之後, SocketChannel.isOpen()
會返回 false,表示當前套接字管道已經關閉了。
此時,如果還去呼叫讀寫方法,例如 SocketChannel#read(ByteBuffer buffer)
,會丟擲 java.nio.channels.ClosedChannelException
異常。
NIO 模型
上一節,我們用管道實現了一個 BIO 阻塞式模型,在 BIO 模型中,服務端建立連線後就會立馬分配一個執行緒等待訊息到達。由於不知道什麼時候訊息能到達客戶端,所以主要一直阻塞等待。
能否等訊息到達之後在分配執行緒進行處理?這就需要 Selector 出場了。只要將管道設定為非阻塞模式,然後註冊至 Selector 。當訊息到達後就會得到通知。
API:java.nio.channels.SelectableChannel 1.4
- SelectableChannel configureBlocking(boolean block)
設定管道的阻塞模式。
1.初始化管道
初始化管道又分為 建立一個空的服務端套接字管道,繫結套接字埠,設定非阻塞模式,註冊事件
// 建立一個空的套接字管道
ServerSocketChannel channel = ServerSocketChannel.open();
// 坑:如果不設定非阻塞,還是阻塞式模型
channel.configureBlocking(false);
// 繫結埠
channel.bind(new InetSocketAddress(8081));
Selector selector = Selector.open();
// 註冊事件,監聽建立連線事件
channel.register(selector, SelectionKey.OP_ACCEPT);
SelectableChannel 註冊事件到 Selector 上,本質上是要建立兩者之間的一對一關係。我們可以看一下下面的 UML 圖:
為什麼是註冊 OP_ACCEPT 而不是其他事件呢?
因為 ServerSocketChannel 只能註冊 OP_ACCEPT 事件,其他事件都不會註冊成功。詳細請看 NIO入門之多路複用選擇器Selector
注意
- 如果我們不設定非阻塞,即呼叫
channel.configureBlocking(false);
,那麼註冊事件將會失敗。即下圖所示的 IllegalBlockingModeException 異常
2.遍歷鍵集
while (true) {
int count = selector.select();
if (count == 0) continue;
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
handleAccept(selectionKey);
} else if (selectionKey.isReadable()) {
handleRead(selectionKey);
} else if (selectionKey.isWritable()) {
handleWrite(selectionKey);
}
// 坑,如果不移除,同一個事件會重複處理
iterator.remove();
}
}
3.處理 OP_ACCEPT 接受連線,註冊管道
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = server.accept();
// 坑:雖然設定好了 ServerSocketChannel 是非阻塞的,但是還是需要設定 SocketChannel 也是非阻塞的
socketChannel.configureBlocking(false);
SelectionKey read = socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
System.out.println(read.equals(selectionKey)); // 輸出 false
這裡也需要 設定非阻塞,否則註冊時也會丟擲 IllegalBlockingModeException 異常。
需要使用 Selector 註冊每個新建立的套接字管道 SocketChannel。
一般來說,伺服器都是響應客戶端的請求的,所以會註冊 OP_READ 事件。如果註冊 OP_WRITE 事件,寫事件不會阻塞,會立即觸發。
4.處理 OP_READ 事件,讀取資料
private void handleRead(SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
buffer.clear();
socketChannel.read(buffer);
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.print("Reading ");
System.out.println(new String(data));
SelectionKey write = socketChannel.register(selector, SelectionKey.OP_WRITE);
// SelectionKey write = selectionKey.interestOps(SelectionKey.OP_WRITE);
System.out.println(write.equals(selectionKey));
}
- 呼叫 register 和 interestOps 這裡效果是一樣的。
因為 handleAccept 的時候,就已經註冊好 SocketChannel 和 Selector 的一對一關係了。所以就算是呼叫register
其實也就是呼叫interestOps
。
5.處理 OP_WRITE 事件
private void handleWrite(SelectionKey selectionKey) throws IOException {
System.out.println("Writing...");
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.write(ByteBuffer.wrap("heartbeat\r\n".getBytes()));
selectionKey.interestOps(SelectionKey.OP_READ);
}
總結
java 1.4 引入 java.nio 包,使用這些 API 可以實現一個 BIO 模型,也可以實現 NIO 模型。
BIO模型在客戶端與服務端建立連線之後,服務端就會立即分配一個執行緒,但是服務端又需要阻塞執行緒來等待讀取客戶端傳送資料。
這樣就需要不斷建立新的執行緒應對不斷增加的服務端請求,而建立執行緒是需要消耗伺服器效能的。那麼可不可以等客戶端資料到達後再分配執行緒進行處理呢?
Selector 以及基於事件處理的 NIO 模型“應運而生”。
參考原始碼
服務端
public class TcpServer {
public static void main(String[] args) {
try {
ServerSocketChannel channel = ServerSocketChannel.open();
// 坑:如果不設定非阻塞,還是阻塞式模型
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(8081));
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT);
new Thread(new SelectorIO(selector), "Selector-IO").start();
System.in.read(); // 阻塞主執行緒
} catch (IOException ex) {
System.out.println("TcpServer " + ex);
}
}
}
IO 執行緒
public class SelectorIO implements Runnable {
private Selector selector;
public SelectorIO(Selector selector) {
this.selector = selector;
}
@Override
public void run() {
try {
while (true) {
int count = selector.select();
if (count == 0) continue;
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
handleAccept(selectionKey);
} else if (selectionKey.isReadable()) {
handleRead(selectionKey);
} else if (selectionKey.isWritable()) {
handleWrite(selectionKey);
}
// 坑,如果不移除,同一個事件會重複處理
iterator.remove();
}
}
} catch (Exception e) {
System.out.println("SelectorIO run error." + e);
e.printStackTrace();
}
}
private void handleWrite(SelectionKey selectionKey) throws IOException {
System.out.println("Writing...");
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.write(ByteBuffer.wrap("heartbeat\r\n".getBytes()));
selectionKey.interestOps(SelectionKey.OP_READ);
}
private void handleRead(SelectionKey selectionKey) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
buffer.clear();
socketChannel.read(buffer);
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.print("Reading ");
System.out.println(new String(data));
SelectionKey write = socketChannel.register(selector, SelectionKey.OP_WRITE);
// SelectionKey write = selectionKey.interestOps(SelectionKey.OP_WRITE);
System.out.println(write.equals(selectionKey));
}
private void handleAccept(SelectionKey selectionKey) throws IOException {
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = server.accept();
// 坑:雖然設定好了 ServerSocketChannel 是非阻塞的,但是還是需要設定 SocketChannel 也是非阻塞的
socketChannel.configureBlocking(false);
SelectionKey read = socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
System.out.println(read.equals(selectionKey));
}
}