BIO到NIO原始碼的一些事兒之NIO 下 之 Selector
前言
此係列文章會詳細解讀NIO的功能逐步豐滿的路程,為Reactor-Netty 庫的講解鋪平道路。
關於Java程式設計方法論-Reactor與Webflux的視訊分享,已經完成了Rxjava 與 Reactor,b站地址如下:
Rxjava原始碼解讀與分享:www.bilibili.com/video/av345…
Reactor原始碼解讀與分享:www.bilibili.com/video/av353…
本系列原始碼解讀基於JDK11 api細節可能與其他版本有所差別,請自行解決jdk版本問題。
本系列前幾篇:
SelectionKey的引入
如我們在前面內容所講,在學生確定之後,我們就要對其狀態進行設定,然後再交由Selector
進行管理,其狀態的設定我們就通過SelectionKey
來進行。
那這裡我們先通過之前在Channel
中並未仔細講解的SelectableChannel
下的register
方法。我們前面有提到過, SelectableChannel
將channel
打造成可以通過Selector
來進行多路複用。作為管理者,channel
想要實現複用,就必須在管理者這裡進行註冊登記。所以,SelectableChannel
下的register
register
方法的解讀,請看我們之前的文章BIO到NIO原始碼的一些事兒之NIO 上 中賦予Channel可被多路複用的能力這一節的內容。
這裡要記住的是SelectableChannel
是對接channel
特徵(即SelectionKey
)的關鍵所在,這有點類似於表設計,原本可以將特徵什麼的設定在一張表內,但為了操作更加具有針對性,即為了讓程式碼功能更易於管理,就進行抽取並設計了第二張表,這個就有點像人體器官,整體上大家共同協作完成一件事,但器官內部自己專注於自己的主要特定功能,偶爾也具備其他器官的一些小功能。
由此,我們也就可以知道,SelectionKey
SelectableChannel
與Selector
關聯的標記,可以簡單理解為一個token
。就好比是我們做許可權管理系統使用者登入後前臺會從後臺拿到的一個token
一樣,使用者可以憑藉此token
來訪問操作相應的資源資訊。
//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{ ...
synchronized (regLock) {
...
synchronized (keyLock) {
...
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}
複製程式碼
結合上下兩段原始碼,在每次Selector
使用register
方法註冊channel
時,都會建立並返回一個SelectionKey
。
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
複製程式碼
我們在BIO到NIO原始碼的一些事兒之NIO 上 中賦予Channel可被多路複用的能力這一節的內容知道,一旦註冊到Selector
上,Channel
將一直保持註冊直到其被解除註冊。在解除註冊的時候會解除Selector
分配給Channel
的所有資源。 也就是SelectionKey
在其呼叫SelectionKey#channel
方法,或這個key所代表的channel
關閉,抑或此key所關聯的Selector
關閉之前,都是有效。我們在前面的文章分析中也知道,取消一個SelectionKey
,不會立刻從Selector
移除,它將被新增到Selector
的cancelledKeys
這個Set
集合中,以便在下一次選擇操作期間刪除,我們可以通過java.nio.channels.SelectionKey#isValid
判斷一個SelectionKey
是否有效。
SelectionKey包含四個操作集,每個操作集用一個Int來表示,int值中的低四位的bit 用於表示channel
支援的可選操作種類。
/**
* Operation-set bit for read operations.
*/
public static final int OP_READ = 1 << 0;
/**
* Operation-set bit for write operations.
*/
public static final int OP_WRITE = 1 << 2;
/**
* Operation-set bit for socket-connect operations.
*/
public static final int OP_CONNECT = 1 << 3;
/**
* Operation-set bit for socket-accept operations.
*/
public static final int OP_ACCEPT = 1 << 4;
複製程式碼
interestOps
通過interestOps
來確定了selector
在下一個選擇操作的過程中將測試哪些操作類別的準備情況,操作事件是否是channel
關注的。interestOps
在SelectionKey
建立時,初始化為註冊Selector
時的ops值,這個值可通過sun.nio.ch.SelectionKeyImpl#interestOps(int)
來改變,這點我們在SelectorImpl#register
可以清楚的看到。
//sun.nio.ch.SelectionKeyImpl
public final class SelectionKeyImpl
extends AbstractSelectionKey
{
private static final VarHandle INTERESTOPS =
ConstantBootstraps.fieldVarHandle(
MethodHandles.lookup(),
"interestOps",
VarHandle.class,
SelectionKeyImpl.class, int.class);
private final SelChImpl channel;
private final SelectorImpl selector;
private volatile int interestOps;
private volatile int readyOps;
// registered events in kernel, used by some Selector implementations
private int registeredEvents;
// index of key in pollfd array, used by some Selector implementations
private int index;
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch;
selector = sel;
}
...
}
複製程式碼
readyOps
readyOps
表示通過Selector
檢測到channel
已經準備就緒的操作事件。在SelectionKey
建立時(即上面原始碼所示),readyOps
值為0,在Selector
的select
操作中可能會更新,但是需要注意的是我們不能直接呼叫來更新。
SelectionKey
的readyOps
表示一個channel
已經為某些操作準備就緒,但不能保證在針對這個就緒事件型別的操作過程中不會發生阻塞,即該操作所線上程有可能會發生阻塞。在完成select
操作後,大部分情況下會立即對readyOps
更新,此時readyOps
值最準確,如果外部的事件或在該channel
有IO操作,readyOps
可能不準確。所以,我們有看到其是volatile
型別。
SelectionKey
定義了所有的操作事件,但是具體channel
支援的操作事件依賴於具體的channel
,即具體問題具體分析。 所有可選擇的channel
(即SelectableChannel
的子類)都可以通過SelectableChannel#validOps
方法,判斷一個操作事件是否被channel
所支援,即每個子類都會有對validOps
的實現,返回一個數字,僅標識channel
支援的哪些操作。嘗試設定或測試一個不被channel
所支援的操作設定,將會丟擲相關的執行時異常。 不同應用場景下,其所支援的Ops
是不同的,摘取部分如下所示:
//java.nio.channels.SocketChannel#validOps
public final int validOps() {
//即1|4|8 1101
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE
| SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps() {
// 16
return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps() {
// 1|4
return (SelectionKey.OP_READ
| SelectionKey.OP_WRITE);
}
複製程式碼
如果需要經常關聯一些我們程式中指定資料到SelectionKey
,比如一個我們使用一個object表示上層的一種高階協議的狀態,object用於通知實現協議處理器。所以,SelectionKey支援通過attach
方法將一個物件附加到SelectionKey
的attachment
上。attachment
可以通過java.nio.channels.SelectionKey#attachment
方法進行訪問。如果要取消該物件,則可以通過該種方式:selectionKey.attach(null)
。
需要注意的是如果附加的物件不再使用,一定要人為清除,如果沒有,假如此SelectionKey
一直存在,由於此處屬於強引用,那麼垃圾回收器不會回收該物件,若不清除的話會成記憶體洩漏。
SelectionKey在由多執行緒併發使用時,是執行緒安全的。我們只需要知道,Selector
的select
操作會一直使用在呼叫該操作開始時當前的interestOps
所設定的值。
Selector探究
到現在為止,我們已經多多少少接觸了Selector
,其是一個什麼樣的角色,想必都很清楚了,那我們就在我們已經接觸到的來進一步深入探究Selector
的設計執行機制。
Selector的open方法
從命名上就可以知道 SelectableChannel
物件是依靠Selector
來實現多路複用的。 我們可以通過呼叫java.nio.channels.Selector#open
來建立一個selector
物件:
//java.nio.channels.Selector#open
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
複製程式碼
關於這個SelectorProvider.provider()
,其使用了根據所在系統的預設實現,我這裡是windows系統,那麼其預設實現為sun.nio.ch.WindowsSelectorProvider
,這樣,就可以呼叫基於相應系統的具體實現了。
//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {
/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}
}
複製程式碼
基於windows來講,selector這裡最終會使用sun.nio.ch.WindowsSelectorImpl
來做一些核心的邏輯。
public class WindowsSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
複製程式碼
這裡,我們需要來看一下WindowsSelectorImpl
的建構函式:
//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
複製程式碼
我們由Pipe.open()
就可知道selector
會保持開啟的狀態,直到其呼叫它的close
方法:
//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
boolean open = selectorOpen.getAndSet(false);
if (!open)
return;
implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
implClose();
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}
//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose() throws IOException {
assert !isOpen();
assert Thread.holdsLock(this);
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
pollWrapper.free();
// Make all remaining helper threads exit
for (SelectThread t: threads)
t.makeZombie();
startLock.startThreads();
}
複製程式碼
可以看到,前面的wakeupPipe
在close方法中關閉掉了。這裡的close方法中又涉及了wakeupPipe.sink()
與wakeupPipe.source()
的關閉與pollWrapper.free()
的釋放,此處也是我們本篇的難點所在,這裡,我們來看看它們到底是什麼樣的存在。 首先,我們對WindowsSelectorImpl(SelectorProvider sp)
這個建構函式做下梳理:
- 建立一個
PollArrayWrapper
物件(pollWrapper
); Pipe.open()
開啟一個管道;- 拿到
wakeupSourceFd
和wakeupSinkFd
兩個檔案描述符; - 把pipe內Source端的檔案描述符(
wakeupSourceFd
)放到pollWrapper
裡;
Pipe.open()的解惑
這裡我們會有疑惑,為什麼要建立一個管道,它是用來做什麼的。
我們來看Pipe.open()
原始碼實現:
//java.nio.channels.Pipe#open
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
try {
AccessController.doPrivileged(new Initializer(sp));
} catch (PrivilegedActionException x) {
throw (IOException)x.getCause();
}
}
private class Initializer
implements PrivilegedExceptionAction<Void>
{
private final SelectorProvider sp;
private IOException ioe = null;
private Initializer(SelectorProvider sp) {
this.sp = sp;
}
@Override
public Void run() throws IOException {
LoopbackConnector connector = new LoopbackConnector();
connector.run();
if (ioe instanceof ClosedByInterruptException) {
ioe = null;
Thread connThread = new Thread(connector) {
@Override
public void interrupt() {}
};
connThread.start();
for (;;) {
try {
connThread.join();
break;
} catch (InterruptedException ex) {}
}
Thread.currentThread().interrupt();
}
if (ioe != null)
throw new IOException("Unable to establish loopback connection", ioe);
return null;
}
複製程式碼
從上述原始碼我們可以知道,建立了一個PipeImpl
物件, 在PipeImpl
的建構函式裡會執行AccessController.doPrivileged
,在它呼叫後緊接著會執行Initializer
的run
方法:
//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {
@Override
public void run() {
ServerSocketChannel ssc = null;
SocketChannel sc1 = null;
SocketChannel sc2 = null;
try {
// Create secret with a backing array.
ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);
// Loopback address
InetAddress lb = InetAddress.getLoopbackAddress();
assert(lb.isLoopbackAddress());
InetSocketAddress sa = null;
for(;;) {
// Bind ServerSocketChannel to a port on the loopback
// address
if (ssc == null || !ssc.isOpen()) {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(lb, 0));
sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
}
// Establish connection (assume connections are eagerly
// accepted)
sc1 = SocketChannel.open(sa);
RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
do {
sc1.write(secret);
} while (secret.hasRemaining());
secret.rewind();
// Get a connection and verify it is legitimate
sc2 = ssc.accept();
do {
sc2.read(bb);
} while (bb.hasRemaining());
bb.rewind();
if (bb.equals(secret))
break;
sc2.close();
sc1.close();
}
// Create source and sink channels
source = new SourceChannelImpl(sp, sc1);
sink = new SinkChannelImpl(sp, sc2);
} catch (IOException e) {
try {
if (sc1 != null)
sc1.close();
if (sc2 != null)
sc2.close();
} catch (IOException e2) {}
ioe = e;
} finally {
try {
if (ssc != null)
ssc.close();
} catch (IOException e2) {}
}
}
}
}
複製程式碼
這裡即為建立pipe
的過程,windows
下的實現是建立兩個本地的socketChannel
,然後連線(連線的過程通過寫一個隨機資料做兩個socket的連線校驗),兩個socketChannel
分別實現了管道pipe
的source
與sink
端。 而我們依然不清楚這個pipe
到底幹什麼用的, 假如大家熟悉系統呼叫的C/C++
的話,就可以知道,一個阻塞在select
上的執行緒有以下三種方式可以被喚醒:
- 有資料可讀/寫,或出現異常。
- 阻塞時間到,即
time out
。 - 收到一個
non-block
的訊號。可由kill
或pthread_kill
發出。
所以,Selector.wakeup()
要喚醒阻塞的select
,那麼也只能通過這三種方法,其中:
- 第二種方法可以排除,因為
select
一旦阻塞,無法修改其time out
時間。 - 而第三種看來只能在
Linux
上實現,Windows
上沒有這種訊號通知的機制。
看來只有第一種方法了。假如我們多次呼叫Selector.open()
,那麼在Windows
上會每呼叫一次,就會建立一對自己和自己的loopback
的TCP
連線;在Linux上的話,每呼叫一次,會開一對pipe
(pipe在Linux下一般都成對開啟),到這裡,估計我們能夠猜得出來——那就是如果想要喚醒select
,只需要朝著自己的這個loopback
連線發點資料過去,於是,就可以喚醒阻塞在select
上的執行緒了。
我們對上面所述做下總結:在Windows
下,Java
虛擬機器在Selector.open()
時會自己和自己建立loopback
的TCP
連線;在Linux
下,Selector
會建立pipe
。這主要是為了Selector.wakeup()
可以方便喚醒阻塞在select()
系統呼叫上的執行緒(通過向自己所建立的TCP
連結和管道上隨便寫點什麼就可以喚醒阻塞執行緒)。
PollArrayWrapper解讀
在WindowsSelectorImpl
構造器最後,我們看到這一句程式碼:pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
,即把pipe內Source端的檔案描述符(wakeupSourceFd
)放到pollWrapper
裡。pollWrapper
作為PollArrayWrapper
的例項,它到底是什麼,這一節,我們就來對其探索一番。
class PollArrayWrapper {
private AllocatedNativeObject pollArray; // The fd array
long pollArrayAddress; // pollArrayAddress
@Native private static final short FD_OFFSET = 0; // fd offset in pollfd
@Native private static final short EVENT_OFFSET = 4; // events offset in pollfd
static short SIZE_POLLFD = 8; // sizeof pollfd struct
private int size; // Size of the pollArray
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
...
// Access methods for fd structures
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
...
// Adds Windows wakeup socket at a given index.
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
}
複製程式碼
這裡將wakeupSourceFd
的POLLIN
事件標識為pollArray
的EventOps
的對應的值,這裡使用的是unsafe直接操作的記憶體,也就是相對於這個pollArray
所在記憶體地址的偏移量SIZE_POLLFD * i + EVENT_OFFSET
這個位置上寫入Net.POLLIN
所代表的值,即參考下面本地方法相關原始碼所展示的值。putDescriptor
同樣是這種類似操作。當sink端
有資料寫入時,source
對應的檔案描述符wakeupSourceFd
就會處於就緒狀態。
//java.base/windows/native/libnio/ch/nio_util.h
/* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined */
/* in Windows Vista / Windows Server 2008 and later. If we are on an */
/* older release we just use the Solaris constants as this was previously */
/* done in PollArrayWrapper.java. */
#define POLLIN 0x0001
#define POLLOUT 0x0004
#define POLLERR 0x0008
#define POLLHUP 0x0010
#define POLLNVAL 0x0020
#define POLLCONN 0x0002
複製程式碼
AllocatedNativeObject
這個類的父類有大量的unsafe
類的操作,這些都是直接基於記憶體級別的操作。從其父類的構造器中,我們能也清楚的看到pollArray
是通過unsafe.allocateMemory(size + ps)
分配的一塊系統記憶體。
class AllocatedNativeObject // package-private
extends NativeObject
{
/**
* Allocates a memory area of at least {@code size} bytes outside of the
* Java heap and creates a native object for that area.
*/
AllocatedNativeObject(int size, boolean pageAligned) {
super(size, pageAligned);
}
/**
* Frees the native memory area associated with this object.
*/
synchronized void free() {
if (allocationAddress != 0) {
unsafe.freeMemory(allocationAddress);
allocationAddress = 0;
}
}
}
//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
if (!pageAligned) {
this.allocationAddress = unsafe.allocateMemory(size);
this.address = this.allocationAddress;
} else {
int ps = pageSize();
long a = unsafe.allocateMemory(size + ps);
this.allocationAddress = a;
this.address = a + ps - (a & (ps - 1));
}
}
複製程式碼
至此,我們算是完成了對Selector.open()
的解讀,其主要任務就是完成建立Pipe
,並把pipe
source
端的wakeupSourceFd
放入pollArray
中,這個pollArray
是Selector
完成其角色任務的樞紐。本篇主要圍繞Windows的實現來進行分析,即在windows下通過兩個連線的socketChannel
實現了Pipe
,linux
下則直接使用系統的pipe
即可。
SelectionKey在selector中的管理
SelectionKey在selector中註冊
所謂的註冊,其實就是將一個物件放到註冊地物件內的一個容器欄位上,這個欄位可以是陣列,佇列,也可以是一個set集合,也可以是一個list。這裡,同樣是這樣,只不過,其需要有個返回值,那麼把這個要放入集合的物件返回即可。
//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// register (if needed) before adding to key set
implRegister(k);
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
newKeys.addLast(ski);
}
}
複製程式碼
這段程式碼我們之前已經有看過,這裡我們再次溫習下。 首先會新建一個SelectionKeyImpl
物件,這個物件就是對Channel
的包裝,不僅如此,還順帶把當前這個Selector
物件給收了進去,這樣,我們也可以通過SelectionKey
的物件來拿到其對應的Selector
物件。
接著,基於windows
平臺實現的implRegister
,先通過ensureOpen()
來確保該Selector
是開啟的。接著將這個SelectionKeyImpl
加入到WindowsSelectorImpl
內針對於新註冊SelectionKey進行管理的newKeys
之中,newKeys
是一個ArrayDeque
物件。對於ArrayDeque
有不懂的,可以參考Java 容器原始碼分析之 Deque 與 ArrayDeque這篇文章。
然後再將此這個SelectionKeyImpl
加入到sun.nio.ch.SelectorImpl#keys
中去,這個Set<SelectionKey>
集合代表那些已經註冊到當前這個Selector
物件上的SelectionKey
集合。我們來看sun.nio.ch.SelectorImpl
的建構函式:
//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = ConcurrentHashMap.newKeySet();
selectedKeys = new HashSet<>();
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
複製程式碼
也就是說,這裡的publicKeys
就來源於keys
,只是publicKeys
屬於只讀的,我們想要知道當前Selector
物件上所註冊的keys
,就可以呼叫sun.nio.ch.SelectorImpl#keys
來得到:
//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys() {
ensureOpen();
return publicKeys;
}
複製程式碼
再回到這個建構函式中,selectedKeys
,顧名思義,其屬於已選擇Keys,即前一次操作期間,已經準備就緒的Channel
所對應的SelectionKey
。此集合為keys
的子集。通過selector.selectedKeys()
獲取。
//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
ensureOpen();
return publicSelectedKeys;
}
複製程式碼
我們看到其返回的是publicSelectedKeys
,針對這個欄位裡的元素操作可以做刪除,但不能做增加。 在前面的內容中,我們有涉及到SelectionKey
的取消,所以,我們在java.nio.channels.spi.AbstractSelector
方法內,是有定義cancelledKeys
的,也是一個HashSet
物件。其代表已經被取消但尚未取消註冊(deregister)的SelectionKey
。此Set集合無法直接訪問,同樣,它也是keys()的子集。
對於新的Selector
例項,上面幾個集合均為空。由上面展示的原始碼可知,通過channel.register
將SelectionKey
新增keys
中,此為key的來源。 如果某個selectionKey.cancel()
被呼叫,那麼此key將會被新增到cancelledKeys
這個集合中,然後在下一次呼叫selector select
方法期間,此時canceldKeys
不為空,將會觸發此SelectionKey
的deregister
操作(釋放資源,並從keys
中移除)。無論通過channel.close()
還是通過selectionKey.cancel()
,都會導致SelectionKey
被加入到cannceldKey
中.
每次選擇操作(select)期間,都可以將key新增到selectedKeys
中或者將從cancelledKeys
中移除。
Selector的select方法的解讀
瞭解了上面的這些,我們來進入到select
方法中,觀察下它的細節。由Selector
的api可知,select
操作有兩種形式,一種為 select(),selectNow(),select(long timeout);另一種為select(Consumer<SelectionKey> action, long timeout)
,select(Consumer<SelectionKey> action)
,selectNow(Consumer<SelectionKey> action)
。後者為JDK11新加入的api,主要針對那些準備好進行I/O操作的channels在select過程中對相應的key進行的一個字的自定義的一個操作。 需要注意的是,有Consumer<SelectionKey> action
引數的select操作是阻塞的,只有在選擇了至少一個Channel的情況下,才會呼叫此Selector
例項的wakeup
方法來喚醒,同樣,其所線上程被打斷也可以。
//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout)
throws IOException
{
Objects.requireNonNull(action);
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
複製程式碼
我們可以觀察,無論哪種,它們最後都落在了lockAndDoSelect
這個方法上,最終會執行特定系統上的doSelect(action, timeout)
實現。 這裡我們以sun.nio.ch.WindowsSelectorImpl#doSelect
為例來講述其操作執行的步驟:
// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue(); // <1>
processDeregisterQueue(); // <2>
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// Calculate number of helper threads needed for poll. If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset(); // reset finishLock
// Wakeup helper threads, waiting on startLock, so they start polling.
// Redundant threads will exit here after wakeup.
startLock.startThreads();
// do polling in the main thread. Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll(); // <3>
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
// Main thread is out of poll(). Wakeup others and wait for them
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue(); // <4>
int updated = updateSelectedKeys(action); // <5>
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket(); // <6>
return updated;
}
複製程式碼
processUpdateQueue解讀
-
首先通過相應作業系統實現類(此處是WindowsSelectorImpl)的具體實現我們可以知道,通過
<1>
處的processUpdateQueue()
獲得關於每個剩餘Channel
(有些Channel取消了)的在此刻的interestOps
,這裡包括新註冊的和updateKeys
,並對其進行pollWrapper
的管理操作。-
即對於新註冊的
SelectionKeyImpl
,我們在相對於這個pollArray
所在記憶體地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSET
與SIZE_POLLFD * totalChannels + EVENT_OFFSET
分別存入SelectionKeyImpl
的檔案描述符fd
與其對應的EventOps
(初始為0)。 -
對
updateKeys
,因為是其之前已經在pollArray
的某個相對位置上儲存過,這裡我們還需要對拿到的key的有效性進行判斷,如果有效,只需要將正在操作的這個SelectionKeyImpl
物件的interestOps
寫入到在pollWrapper
中的存放它的EventOps
位置上。
注意: 在對
newKeys
進行key的有效性判斷之後,如果有效,會呼叫growIfNeeded()
方法,這裡首先會判斷channelArray.length == totalChannels
,此為一個SelectionKeyImpl
的陣列,初始容量大小為8。channelArray
其實就是方便Selector
管理在冊SelectionKeyImpl
數量的一個數組而已,通過判斷它的陣列長度大小,如果和totalChannels
(初始值為1)相等,不僅僅是為了channelArray
擴容,更重要的是為了輔助pollWrapper
,讓pollWrapper
擴容才是這裡的目的所在。 而當totalChannels % MAX_SELECTABLE_FDS == 0
時,則多開一個執行緒處理selector
。windows
上select
系統呼叫有最大檔案描述符限制,一次只能輪詢1024
個檔案描述符,如果多於1024個,需要多執行緒進行輪詢。通過ski.setIndex(totalChannels)
選擇鍵記錄下在陣列中的索引位置SelectionKeyImpl
選擇鍵的對映關係,以待後續使用。同時呼叫pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels)
在相對於這個pollArray
所在記憶體地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSET
這個位置上寫入wakeupSourceFd
所代表的fdVal
值。這樣在新起的執行緒就可以通過MAX_SELECTABLE_FDS
來確定這個用來監控的wakeupSourceFd
。 -
/**
* sun.nio.ch.WindowsSelectorImpl#processUpdateQueue
* Process new registrations and changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);
synchronized (updateLock) {
SelectionKeyImpl ski;
// new registrations
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
pollWrapper.putEntry(totalChannels, ski);
totalChannels++;
MapEntry previous = fdMap.put(ski);
assert previous == null;
}
}
// changes to interest ops
while ((ski