Java NIO之Selector 淺析
Java NIO 由以下幾個核心部分組成:
1、Buffer 2、Channel 3、Selector
在Java NIO之Channel、Buffer中 簡單的介紹了Buffer,Channel ,本文主要講解NIO的Selector相關知識,Selector是整個NIO的核心,理解selector機制是理解整個NIO的關鍵所在。
理解selector 之前,最好要知道I/O模型,如果對Linux 網路程式設計(poll epoll select)熟悉那就更棒了,因為我以前接觸過Linux 網路程式設計,所以在接觸Java Nio的時候感覺很親切.
舉個栗子
/** * server 端 */ public class Server { private ByteBuffer readBuffer = ByteBuffer.allocateDirect(1024); private ByteBuffer writeBuffer = ByteBuffer.allocateDirect(1024); private Selector selector; public Server() throws IOException{ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //設定非阻塞模式 serverSocketChannel.configureBlocking(false); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(new InetSocketAddress(8080)); System.out.println("listening on port 8080"); //開啟 selector this.selector = Selector.open(); //在 selector 註冊感興趣的事件 serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); } private void start() throws Exception{ while(true){ //呼叫阻塞的select,等待 selector上註冊的事件發生 this.selector.select(); //獲取就緒事件 Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey selectionKey = iterator.next(); //先移除該事件,避免重複通知 iterator.remove(); // 新連線 if(selectionKey.isAcceptable()){ System.out.println("isAcceptable"); ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel(); // 新註冊channel SocketChannel socketChannel = server.accept(); if(socketChannel==null){ continue; } //非阻塞模式 socketChannel.configureBlocking(false); //註冊讀事件(服務端一般不註冊 可寫事件) socketChannel.register(selector, SelectionKey.OP_READ); ByteBuffer buffer = ByteBuffer.allocateDirect(1024); buffer.put("hi new channel".getBytes()); buffer.flip(); int writeBytes= socketChannel.write(buffer); } // 服務端關心的可讀,意味著有資料從client傳來了資料 if(selectionKey.isReadable()){ System.out.println("isReadable"); SocketChannel socketChannel = (SocketChannel)selectionKey.channel(); readBuffer.clear(); socketChannel.read(readBuffer); readBuffer.flip(); String receiveData= Charset.forName("UTF-8").decode(readBuffer).toString(); System.out.println("receiveData:"+receiveData); //這裡將收到的資料發回給客戶端 writeBuffer.clear(); writeBuffer.put(receiveData.getBytes()); writeBuffer.flip(); while(writeBuffer.hasRemaining()){ //防止寫緩衝區滿,需要檢測是否完全寫入 System.out.println("寫入資料:"+socketChannel.write(writeBuffer)); } } } } } public static void main(String[] args) throws Exception{ new Server().start(); } }
先貼一個簡單的例子,後面根據這個進行拆分。
Selector
Selector 一般稱 為選擇器 (或 多路複用器) 。它是Java NIO核心元件中的一個,用於檢查一個或多個NIO Channel(通道)的狀態是否處於可讀、可寫。可以實現單執行緒管理多個channels,也就是說可以管理多個網路連線。 使用 Selector 的圖解如下:
為了使用 Selector, 我們首先需要將 Channel 註冊到 Selector 中, 隨後呼叫 Selector 的 select()方法, 這個方法會阻塞, 直到註冊在 Selector 中的 Channel 傳送可讀寫事件(或其它註冊事件). 當這個方法返回後, 當前的這個執行緒就可以處理 Channel 的事件了(已準備就緒的Channel).
建立Selector
通過 Selector.open()方法, 我們可以建立一個選擇器:
Selector selector = Selector.open();
這裡提一下Selector 在Windows和Linux 上有不同的實現
將 Channel 註冊到Selector 中
我們需要將 Channel 註冊到Selector 中,這樣才能通過 Selector 監控 Channel :
//非阻塞模式 channel.configureBlocking(false); SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
注意, 如果一個 Channel 要註冊到 Selector 中, 那麼這個 Channel 必須是非阻塞的, 即channel.configureBlocking(false); 因為 Channel 必須要是非阻塞的, 因此 FileChannel 是不能夠使用選擇器的, 因為 FileChannel 都是阻塞的.
因為 channel 是非阻塞的,因此當沒有資料的時候會理解返回,因此 實際上 Selector 是不斷的在輪詢其註冊的 channel 是否有資料就緒。
在使用 Channel.register()方法時, 第二個引數指定了我們對 Channel 的什麼型別的事件感興趣, 這些事件有:
- Connect, 連線事件(TCP 連線), 對應於SelectionKey.OP_CONNECT
- Accept, 確認事件, 對應於SelectionKey.OP_ACCEPT
- Read, 讀事件, 對應於SelectionKey.OP_READ, 表示 buffer 可讀.
- Write, 寫事件, 對應於SelectionKey.OP_WRITE, 表示 buffer 可寫.
我們可以使用或運算|來組合多個事件, 例如:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
SelectionKey
當我們使用 register 註冊一個 Channel 時, 會返回一個 SelectionKey 物件, 這個物件包含了如下內容:
- interest set, 即我們感興趣的事件集
- ready set
- channel
- selector
- attached object, 可選的附加物件
interest set
我們可以通過如下方式獲取 interest set:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
ready set
代表了 Channel 已經就緒的操作.,我們可以使用如下方法進行判斷:
int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
Channel 和 Selector
我們可以通過 SelectionKey 獲取相對應的 Channel 和 Selector:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
Attaching Object
我們可以在selectionKey中附加一個物件:
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
或者在註冊時直接附加:
SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);
阻塞 的 select
select
呼叫 Selector 的 select()方法, 這個方法會阻塞, 直到註冊在 Selector 中的 Channel 傳送可讀寫事件(或其它註冊事件). 當這個方法返回後, 當前的這個執行緒就可以處理 Channel 的事件了(已準備就緒的Channel).
select(long timeout)
select(long timeout),超時阻塞等待timeout毫秒(引數),而不是 select()那樣一直阻塞等待,直到有事件就緒。
注意, select()方法返回的值表示有多少個 Channel 可操作.
獲取就緒的 Channel(或 事件)
如果 select()方法返回值表示有多個 Channel 準備好了, 那麼我們可以通過 Selected key set 訪問這個 Channel:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
//可能有多個註冊事件就緒
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
}
if (key.isConnectable()) {
// a connection was established with a remote server.
}
if (key.isReadable()) {
// a channel is ready for reading
}
if (key.isWritable()) {
// a channel is ready for writing
}
}
注意, 在每次迭代時, 我們都呼叫 “keyIterator.remove()” 將這個 key 從迭代器中刪除, 因為 select() 方法僅僅是簡單地將就緒的 IO 操作放到 selectedKeys 集合中, 因此如果我們從 selectedKeys 獲取到一個 key, 但是沒有將它刪除, 那麼下一次 select 時, 這個 key 所對應的 IO 事件還在 selectedKeys 中.
喚醒
選擇器執行選擇的過程,系統底層會依次詢問每個通道是否已經就緒,這個過程可能會造成呼叫執行緒進入阻塞狀態,wakeup方式可以喚醒在select()方法中阻塞的執行緒。
selector.wakeup()
Selector 整體使用 流程
1、建立 ServerSocketChannel 2、通過 Selector.open() 開啟一個 Selector. 3、將 Channel 註冊到 Selector 中, 並設定需要監聽的事件 4、迴圈: 1、呼叫 select() 方法 2、呼叫 selector.selectedKeys() 獲取 就緒 Channel 3、迭代每個 selected key:
- 就處理 就緒的IO事件
- 根據需要更改 selected key 的監聽事件.
- 將已經處理過的 key 從 selected keys 集合中刪除.
最後這裡附上和前面對應的客戶端的程式碼:
/**
* client 端
*/
public class Client{
private final ByteBuffer sendBuffer=ByteBuffer.allocate(1024);
private final ByteBuffer receiveBuffer=ByteBuffer.allocate(1024);
private Selector selector;
private SocketChannel socketChannel;
public Client()throws IOException{
this.socketChannel = SocketChannel.open();
this.socketChannel.connect(new InetSocketAddress(InetAddress.getLocalHost(),8080));
this.socketChannel.configureBlocking(false);
System.out.println("連線建立成功");
this.selector=Selector.open();
this.socketChannel.register(selector,SelectionKey.OP_READ);
}
public static void main(String[] args) throws Exception{
final Client client=new Client();
Thread sendMsg=new Thread(client::sendInputMsg);
sendMsg.start();
client.start();
}
private void start()throws IOException {
while (selector.select() > 0 ){
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()){
SelectionKey key = it.next();
it.remove();
if (key.isReadable()) {
System.out.println("isReadable");
receive(key);
}
}
}
}
/**
* 接收服務端傳送的內容
* @param key
* @throws IOException
*/
private void receive(SelectionKey key)throws IOException{
SocketChannel socketChannel=(SocketChannel)key.channel();
socketChannel.read(receiveBuffer);
receiveBuffer.flip();
String receiveData=Charset.forName("UTF-8").decode(receiveBuffer).toString();
System.out.println("receive server message:"+receiveData);
receiveBuffer.clear();
}
/**
* 傳送控制檯輸入內容至伺服器
*/
private void sendInputMsg() {
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(System.in));
try{
String msg;
while ((msg = bufferedReader.readLine()) != null){
synchronized(sendBuffer){
sendBuffer.put((msg+"\r\n").getBytes());
sendBuffer.flip();
while(sendBuffer.hasRemaining()){
socketChannel.write(sendBuffer);
}
sendBuffer.compact();
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}