NIO之終極Selctor原始碼分析
回顧一下Selector,多路複用器Selector可以管理多個Channel,可以向Selector註冊感興趣的事件,當事件就緒,通過Selector.selector方法獲取註冊的事件,進行相應的操作。
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress (1234));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator. remove();
if (key.isAcceptable()) {
...
} else if (key.isReadable() && key.isValid()) {
...
}
keys.remove(key);
}
}
1. 例項化分析
一般我們獲取Selector通過Selector.open()獲取,
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector ();
}
Selector內部是通過SelectorProvider類的Provider方法實現。
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
這裡使用單例模式,保證只有一個SelectorProvider。provider由sun.nio.ch.DefaultSelectorProvider.create()
建立。create方法內部通過系統的名稱來建立建立SelectorProvider,在這裡建立了sun.nio.ch.EPollSelectorProvider
public static SelectorProvider create() {
String var0 = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
if (var0.equals("SunOS")) {
return createProvider("sun.nio.ch.DevPollSelectorProvider");
} else {
return (SelectorProvider)(var0.equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());
}
}
繼續回到Selector的open()中,獲取到SelecProvider例項之後,繼續呼叫openSelector(),很自然進入EPollSelectorProvider的openSelector()方法:
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorProvider(this);
}
進入EPollSelectorProvider的建構函式
EPollSelectorImpl(SelectorProvider var1) throws IOException {
super(var1);
long var2 = IOUtil.makePipe(false);
this.fd0 = (int)(var2 >>> 32);
this.fd1 = (int)var2;
this.pollWrapper = new EPollArrayWrapper();
this.pollWrapper.initInterrupt(this.fd0, this.fd1);
this.fdToKey = new HashMap();
}
初始化的主要步驟如下:
- 首先通過IOUtil.makePipe(false)返回了一個非堵塞的管道(pipe),底層是通過Linux的pipe系統呼叫實現的;返回兩個int常量,分別指向管道的讀、寫檔案描述符。讀端在高32位,寫端在低32位
- 接著定義了EPollArrayWrapper變數,並呼叫initInterrup方法,將fd0註冊到epoll;
- 最後定義了一個map型別的變數fdToKey,將channel的檔案描述符ID和SelectionKey建立對映關係,SelectionKey中儲存了Channel Selector 感興趣的事件。
EpollArrayWapper is what? EpollArrayWapper將Linux的epoll相關係統呼叫封裝成了native方法供EpollSelectorImpl使用,EpollArrayWapper類中有幾個比較重要的Navtive方法:
private native int epollCreate();
private native void epollCtl(int paramInt1, int paramInt2, int paramInt3, int paramInt4);
private native int epollWait(long paramLong1, int paramInt1, long paramLong2, int paramInt2) throws IOException;
看起來似乎比較眼熟,好像和Epoll的三個方法有些雷同,沒錯這三個native方法正是對上述epoll系列系統呼叫的包裝。
2. Resigter
下面分析serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)的底層邏輯
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
synchronized (regLock) {
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) { // New registration synchronized (keyLock) {
if (!isOpen()) throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att); addKey(k);
}
} return k;
}
}
- 如果該channel和selector已經註冊過,則直接新增事件和附件。
- 否則通過selector實現註冊過程。 繼續呼叫SelectorImp的register方法:
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
-
以當前channel和selector為引數,初始化SelectionKeyImpl 物件selectionKeyImpl ,並新增附件attachment。
-
implRegister(k)將channel註冊到epoll中
-
k.interestOps(int) 的內部呼叫了nioInterestOps:
public SelectionKey nioInterestOps(int var1) {
this.channel.translateAndSetInterestOps(var1, this);
this.interestOps = var1;
return this;
}
- translateAndSetInterestOps方法會將註冊的感興趣的事件和其對應的檔案描述儲存到EPollArrayWrapper物件的eventsLow或eventsHigh中,這是給底層實現epoll_wait時使用的。以SeverSocketChannel的實現為例。
public void translateAndSetInterestOps(int var1, SelectionKeyImpl var2) {
int var3 = 0;
if ((var1 & 16) != 0) {
var3 |= Net.POLLIN;
}
var2.selector.putEventOps(var2, var3);
}
//EpollSelector中
public void putEventOps(SelectionKeyImpl var1, int var2) {
SelChImpl var3 = var1.channel;
this.pollWrapper.setInterest(var3.getFDVal(), var2);
}
- 同時該操作還會將設定SelectionKey的interestOps欄位,這是給我們程式設計師獲取使用的。
EPollSelectorImpl.implRegister:
protected void implRegister(SelectionKeyImpl var1) {
SelChImpl var2 = var1.channel;
int var3 = Integer.valueOf(var2.getFDVal());
this.fdToKey.put(var3, var1);
this.pollWrapper.add(var3);
this.keys.add(var1);
}
- 將channel對應的fd(檔案描述符)和對應的SelectionKeyImpl放到fdToKey對映表中。
- 將channel對應的fd(檔案描述符)新增到EPollArrayWrapper中,並強制初始化fd的事件為0 ( 強制初始更新事件為0,因為該事件可能存在於之前被取消過的註冊中。)
- 將selectionKey放入到keys集合中。
3. Select
Selector.select方法最終呼叫的是doSelect方法:
protected int doSelect(long paramLong) throws IOException{
//處理待取消的SelectionKey(呼叫SelectionKey.cancel()方法取消)
processDeregisterQueue();
try {
begin();
this.pollWrapper.poll(paramLong);
} finally {
end();
}
processDeregisterQueue();
int i = updateSelectedKeys();
if (this.pollWrapper.interrupted()) {
this.pollWrapper.putEventOps(this.pollWrapper.interruptedIndex(), 0);
synchronized (this.interruptLock) {
this.pollWrapper.clearInterrupted();
IOUtil.drain(this.fd0);
this.interruptTriggered = false;
}
}
return i;
}