Java NIO之選擇器分析
目錄
簡介
傳統的IO流請求都是一個請求需要一個執行緒來處理,如果請求數量比較龐大,那麼對於作業系統來說,執行緒佔用一定的記憶體,執行緒上下文切換開銷也很大。選擇器Selector是java NIO中用來檢測多個通道的就緒情況(是否準備好讀寫),使用單個執行緒可以管理多個通道。執行緒可以休眠,直到註冊到選擇器上的通道處於就緒狀態或者週期性輪詢檢測是否有通道處於就緒狀態。一個或多個可選擇的通道註冊到選擇器物件上,將會返回一個表示選擇器和通道的鍵,選擇鍵物件裡面包含了對應的通道和選擇器。需要注意的是選擇器與通道一起使用,通道必須是可選擇的(即繼承SelectableChannel類),非阻塞模式,所以FileChannel無法與選擇器Selector一起使用,所有的套接字通道都可以。
選擇器Selector
1.建立
選擇器物件可以通道呼叫靜態方法open()建立例項.
Selector selector = Selector.open();
2.註冊通道
可選擇通道(SelectableChannel)包含所有套接字通道,不包含FileChannel通道可以註冊到選擇器上面,同時可以指定感興趣的操作。通道註冊到選擇器之前,需要將通道設定成非阻塞式的。可選擇通道里面定義register()和設定阻塞模式的方法,管理這些通道的真正的是選擇器,並且也會管理表示選擇器和通道的鍵。
channel.configureBlocking(false); SelectionKey key = channel.register(selector,int ops);
對於register()方法中的第二個引數,表示的是感興趣的操作,選擇器監視通道感興趣的事件,包含了四種不同型別的事件或者操作:讀(read),寫(write),連線(connect),接受(accept),並不是所有的操作可以在所有的通道上使用,比如:SocketChannel不支援accept()操作。呼叫通道中的方法validOps()方法可以獲取通道可支援的操作,四種不同操作用選擇鍵SelectionKey中四個常量來表示的。
- SelectionKey.OP_CONNECT.
- SelectionKey.OP_ACCEPT.
- SelectionKey.OP_READ.
- SelectionKey.OP_WRITE.
對於兩種及以上的感興趣的事件,可以使用位操作“或”連線。
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE
3.關閉
當不在使用選擇器時,呼叫close()釋放所有佔用的資源並將所有的選擇鍵設定為無效。
4.wakeUp()
某個執行緒呼叫select()方法後阻塞了,在沒有通道已經就緒的情況下,使用別的執行緒在第一個執行緒呼叫select()方法的那個物件上面呼叫wakeUp(),可以使阻塞在select()方法上的執行緒立即返回。
選擇鍵SelectionKey
任何一個通道和選擇器的註冊關係被封裝在了SelectionKey物件裡面。選擇鍵物件包含如下的屬性:
- interest集合
- ready集合
- channel和selector
- 附加物件
1.interest集合
用於表示通道和選擇器組合體所關係的物件,當前的interest集合可以通過呼叫鍵物件中interestOps()方法獲取,可以通過interest集合和指定選擇鍵常量進行“位與”操作,檢視interest集合中是否包含某個確定的事件。
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;
2.ready集合
表示通道準備好要執行的操作,通過呼叫鍵物件中方法readOps()獲取相關通道已經就緒的操作,ready集合是interest集合的子集,並且表示interest集合中上次呼叫select()後就緒的操作。可以通過interest集合與指定的操作進行“位與”操作檢視通道中相關操作就緒,比如selectionKey.readOps() &SelectionKey.OP_WRITE來檢視ready集合中是否包含讀操作。不過選擇鍵SelectionKey物件中提供了四個方法,如下:
int readySet = selectionKey.readyOps()
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
3.channel和selector
選擇鍵物件裡面可以獲取註冊的通道和選擇器。
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
4.附加物件
選擇鍵物件中提供了方法允許向選擇鍵中新增附件,並在後面獲取它,是一種將任意物件與鍵關聯的便捷方法。
selectionKey.attach(theObject);
Object attachedObj = selectionKey.attachment();
此外SelectableChannel也提供了一個register()方法的過載版本可以進行接收一個Object型別的引數。通道註冊到選擇鍵上的時候可以直接新增附件。
SelectionKey key = channel.register (selector, SelectionKey.OP_READ, myObject);
等價於
SelectionKey key = channel.register (selector, SelectionKey.OP_READ);
key.attach (myObject)
通過Selector選擇通道
public abstract int select( ) throws IOException;
public abstract int select (long timeout) throws IOException;
public abstract int selectNow( ) throws IOException;
public abstract Set keys( )
public abstract Set selectedKeys( )
- select()-----阻塞直到有一個通道在註冊的事件上就緒。
- select(long timeout)-----阻塞時間到timeout毫秒後,不會一直阻塞。
- selectNow()-----完全非阻塞模式,不管什麼通道會立即返回。如果沒有通道就緒,會直接返回0。
- keys()-----與選擇器關聯的已經註冊鍵的集合,並不是所有註冊過的鍵都有效,通過keys()方法可以檢視。
- selectedKeys()-----已註冊鍵的子集,集合中的從成員都是相關的通道被選擇器判定為已就緒的通道。
select()方法返回的int值表示有多少通道準備就緒,不是已經準備好的所有通道的總數,而是上次呼叫過select()之後有多少個通道就緒了。之前呼叫select()就緒的通道,並不會在本次呼叫已就緒中通道計數中被計入。
通過selector中的方法selectedKeys()方法返回的“已選擇鍵的集合”,表示是已就緒的通道集合。跟ready集合不是同一個概念,ready集合表示的是一個通道準備好的操作。如下就是遍歷選擇鍵,然後在一個選擇鍵檢視對應的就緒的操作。
Selector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
int readyChannels = selector.select();
if(readyChannels == 0) continue;
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
}
案例
public class SocketChannelClient {
private Selector selector=null;
private SocketChannelClient initChannel() throws IOException, ClosedChannelException {
//開啟通道
SocketChannel channel = SocketChannel.open();
//配置成非阻塞模式
channel.configureBlocking(false);
//連線到埠1234
channel.connect(new InetSocketAddress(1234));
//建立選擇器
selector = Selector.open();
//註冊通道到選擇器上面, SelectionKey.OP_CONNECT操作
channel.register(selector, SelectionKey.OP_CONNECT);
return this;
}
public static void main(String[] args) throws IOException {
new SocketChannelClient().initChannel().listen();
}
private ByteBuffer buffer = ByteBuffer.allocate(1024);
private void listen() throws IOException {
System.out.println("客戶端監聽....");
//輪詢selector
while(true) {
selector.select();
//遍歷所有的可選擇鍵的集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
//監控到連線事件,向通道寫入資訊,回覆客服端
if(key.isConnectable()) {
SocketChannel channel = (SocketChannel)key.channel();
if(channel.isConnectionPending()) {
channel.finishConnect();
}
channel.configureBlocking(false);
channel.write(writeInfoToServer("this is information from client..."));
//向選擇器上面註冊通道,SelectionKey.OP_READ操作
channel.register(selector, SelectionKey.OP_READ);
//監控可讀事件,接收服務端的資訊
}else if(key.isReadable()) {
receiveInfoFromServer(key);
}
//移除之前已經處理過的事件
iterator.remove();
}
}
}
private void receiveInfoFromServer(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel)key.channel();
buffer.clear();
int count;
while((count= channel.read(buffer))>0) {
buffer.flip();
String info = new String(buffer.array());
System.out.println("receive information from Server:"+info);
}
}
private static ByteBuffer writeInfoToServer(String info) throws UnsupportedEncodingException {
return ByteBuffer.wrap(info.getBytes("UTF-8"));
}
}
public class SocketChannelServer {
private static final int port = 1234;
Selector selector = null;
private SocketChannelServer initSocketChannel() throws IOException, ClosedChannelException {
//開啟一個ServerSocketChannel來監聽
ServerSocketChannel socketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = socketChannel.socket();
//配置成非阻塞模式
socketChannel.configureBlocking(false);
serverSocket.bind(new InetSocketAddress(port));
//建立一個選擇器Selector
selector = Selector.open();
//將通道註冊到選擇器上面
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
return this;
}
public static void main(String[] args) throws IOException {
new SocketChannelServer().initSocketChannel().listen();
}
private void listen() throws IOException {
System.out.println("Listening on port "+port);
//輪詢selector
while(true) {
int select = selector.select();
if(select==0) {
continue;
}
//遍歷所有已選擇鍵
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while(iterator.hasNext()) {
SelectionKey key = iterator.next();
//監控到接收連線操作
if(key.isAcceptable()) {
ServerSocketChannel serverChannel =(ServerSocketChannel) key.channel();
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
sayHello(channel);
}
//監控到可讀操作時,接收客戶端的資訊
if(key.isReadable()) {
readInfoFromClient(key);
}
iterator.remove();
}
}
}
private void readInfoFromClient(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel)key.channel();
int count;
buffer.clear();
while((count= channel.read(buffer))>0) {
buffer.flip();
String info = new String(buffer.array());
System.out.println("receive information from client: "+info);
}
}
private ByteBuffer buffer = ByteBuffer.allocate(1024);
private void sayHello(SocketChannel channel) throws IOException {
buffer.clear();
buffer.put("welcome to here!\n\r".getBytes());
buffer.flip();
channel.write(buffer);
}
}