1. 程式人生 > >BIO到NIO原始碼的一些事兒之NIO 下 之 Selector

BIO到NIO原始碼的一些事兒之NIO 下 之 Selector

前言

此係列文章會詳細解讀NIO的功能逐步豐滿的路程,為Reactor-Netty 庫的講解鋪平道路。

關於Java程式設計方法論-Reactor與Webflux的視訊分享,已經完成了Rxjava 與 Reactor,b站地址如下:

Rxjava原始碼解讀與分享:www.bilibili.com/video/av345…

Reactor原始碼解讀與分享:www.bilibili.com/video/av353…

本系列原始碼解讀基於JDK11 api細節可能與其他版本有所差別,請自行解決jdk版本問題。

本系列前幾篇:

BIO到NIO原始碼的一些事兒之BIO

BIO到NIO原始碼的一些事兒之NIO 上

BIO到NIO原始碼的一些事兒之NIO 中

SelectionKey的引入

如我們在前面內容所講,在學生確定之後,我們就要對其狀態進行設定,然後再交由Selector進行管理,其狀態的設定我們就通過SelectionKey來進行。

那這裡我們先通過之前在Channel中並未仔細講解的SelectableChannel下的register方法。我們前面有提到過, SelectableChannelchannel打造成可以通過Selector來進行多路複用。作為管理者,channel想要實現複用,就必須在管理者這裡進行註冊登記。所以,SelectableChannel下的register

方法也就是我們值得二次關注的核心了,也是對接我們接下來內容的切入點,對於register方法的解讀,請看我們之前的文章BIO到NIO原始碼的一些事兒之NIO 上賦予Channel可被多路複用的能力這一節的內容。

這裡要記住的是SelectableChannel是對接channel特徵(即SelectionKey)的關鍵所在,這有點類似於表設計,原本可以將特徵什麼的設定在一張表內,但為了操作更加具有針對性,即為了讓程式碼功能更易於管理,就進行抽取並設計了第二張表,這個就有點像人體器官,整體上大家共同協作完成一件事,但器官內部自己專注於自己的主要特定功能,偶爾也具備其他器官的一些小功能。

由此,我們也就可以知道,SelectionKey

表示一個SelectableChannelSelector關聯的標記,可以簡單理解為一個token。就好比是我們做許可權管理系統使用者登入後前臺會從後臺拿到的一個token一樣,使用者可以憑藉此token來訪問操作相應的資源資訊。

//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
    throws ClosedChannelException
{       ...
    synchronized (regLock) {
       ...
        synchronized (keyLock) {
           ...
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.attach(att);
                k.interestOps(ops);
            } else {
                // New registration
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
            return k;
        }
    }
}

複製程式碼

結合上下兩段原始碼,在每次Selector使用register方法註冊channel時,都會建立並返回一個SelectionKey

//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
                                        int ops,
                                        Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    // register (if needed) before adding to key set
    implRegister(k);

    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}
複製程式碼

我們在BIO到NIO原始碼的一些事兒之NIO 上賦予Channel可被多路複用的能力這一節的內容知道,一旦註冊到Selector上,Channel將一直保持註冊直到其被解除註冊。在解除註冊的時候會解除Selector分配給Channel的所有資源。 也就是SelectionKey在其呼叫SelectionKey#channel方法,或這個key所代表的channel 關閉,抑或此key所關聯的Selector關閉之前,都是有效。我們在前面的文章分析中也知道,取消一個SelectionKey,不會立刻從Selector移除,它將被新增到SelectorcancelledKeys這個Set集合中,以便在下一次選擇操作期間刪除,我們可以通過java.nio.channels.SelectionKey#isValid判斷一個SelectionKey是否有效。

SelectionKey包含四個操作集,每個操作集用一個Int來表示,int值中的低四位的bit 用於表示channel支援的可選操作種類。


   /**
    * Operation-set bit for read operations.
    */
   public static final int OP_READ = 1 << 0;

   /**
    * Operation-set bit for write operations.
    */
   public static final int OP_WRITE = 1 << 2;

   /**
    * Operation-set bit for socket-connect operations.
    */
   public static final int OP_CONNECT = 1 << 3;

   /**
    * Operation-set bit for socket-accept operations.
    */
   public static final int OP_ACCEPT = 1 << 4;
複製程式碼

interestOps

通過interestOps來確定了selector在下一個選擇操作的過程中將測試哪些操作類別的準備情況,操作事件是否是channel關注的。interestOpsSelectionKey建立時,初始化為註冊Selector時的ops值,這個值可通過sun.nio.ch.SelectionKeyImpl#interestOps(int)來改變,這點我們在SelectorImpl#register可以清楚的看到。

//sun.nio.ch.SelectionKeyImpl
public final class SelectionKeyImpl
   extends AbstractSelectionKey
{
   private static final VarHandle INTERESTOPS =
           ConstantBootstraps.fieldVarHandle(
                   MethodHandles.lookup(),
                   "interestOps",
                   VarHandle.class,
                   SelectionKeyImpl.class, int.class);

   private final SelChImpl channel;
   private final SelectorImpl selector;

   private volatile int interestOps;
   private volatile int readyOps;

   // registered events in kernel, used by some Selector implementations
   private int registeredEvents;

   // index of key in pollfd array, used by some Selector implementations
   private int index;

   SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
       channel = ch;
       selector = sel;
   }
  ...
}
複製程式碼

readyOps

readyOps表示通過Selector檢測到channel已經準備就緒的操作事件。在SelectionKey建立時(即上面原始碼所示),readyOps值為0,在Selectorselect操作中可能會更新,但是需要注意的是我們不能直接呼叫來更新。

SelectionKeyreadyOps表示一個channel已經為某些操作準備就緒,但不能保證在針對這個就緒事件型別的操作過程中不會發生阻塞,即該操作所線上程有可能會發生阻塞。在完成select操作後,大部分情況下會立即對readyOps更新,此時readyOps值最準確,如果外部的事件或在該channel有IO操作,readyOps可能不準確。所以,我們有看到其是volatile型別。

SelectionKey定義了所有的操作事件,但是具體channel支援的操作事件依賴於具體的channel,即具體問題具體分析。 所有可選擇的channel(即SelectableChannel的子類)都可以通過SelectableChannel#validOps方法,判斷一個操作事件是否被channel所支援,即每個子類都會有對validOps的實現,返回一個數字,僅標識channel支援的哪些操作。嘗試設定或測試一個不被channel所支援的操作設定,將會丟擲相關的執行時異常。 不同應用場景下,其所支援的Ops是不同的,摘取部分如下所示:

//java.nio.channels.SocketChannel#validOps
public final int validOps() {
    //即1|4|8  1101
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE
            | SelectionKey.OP_CONNECT);
}
//java.nio.channels.ServerSocketChannel#validOps
public final int validOps() {
    // 16
    return SelectionKey.OP_ACCEPT;
}
//java.nio.channels.DatagramChannel#validOps
public final int validOps() {
    // 1|4
    return (SelectionKey.OP_READ
            | SelectionKey.OP_WRITE);
}
複製程式碼

如果需要經常關聯一些我們程式中指定資料到SelectionKey,比如一個我們使用一個object表示上層的一種高階協議的狀態,object用於通知實現協議處理器。所以,SelectionKey支援通過attach方法將一個物件附加到SelectionKeyattachment上。attachment可以通過java.nio.channels.SelectionKey#attachment方法進行訪問。如果要取消該物件,則可以通過該種方式:selectionKey.attach(null)

需要注意的是如果附加的物件不再使用,一定要人為清除,如果沒有,假如此SelectionKey一直存在,由於此處屬於強引用,那麼垃圾回收器不會回收該物件,若不清除的話會成記憶體洩漏。

SelectionKey在由多執行緒併發使用時,是執行緒安全的。我們只需要知道,Selectorselect操作會一直使用在呼叫該操作開始時當前的interestOps所設定的值。

Selector探究

到現在為止,我們已經多多少少接觸了Selector,其是一個什麼樣的角色,想必都很清楚了,那我們就在我們已經接觸到的來進一步深入探究Selector的設計執行機制。

Selector的open方法

從命名上就可以知道 SelectableChannel物件是依靠Selector來實現多路複用的。 我們可以通過呼叫java.nio.channels.Selector#open來建立一個selector物件:

//java.nio.channels.Selector#open
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}
複製程式碼

關於這個SelectorProvider.provider(),其使用了根據所在系統的預設實現,我這裡是windows系統,那麼其預設實現為sun.nio.ch.WindowsSelectorProvider,這樣,就可以呼叫基於相應系統的具體實現了。

//java.nio.channels.spi.SelectorProvider#provider
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}
//sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider {

/**
 * Prevent instantiation.
 */
private DefaultSelectorProvider() { }

/**
 * Returns the default SelectorProvider.
 */
public static SelectorProvider create() {
    return new sun.nio.ch.WindowsSelectorProvider();
}

}
複製程式碼

基於windows來講,selector這裡最終會使用sun.nio.ch.WindowsSelectorImpl來做一些核心的邏輯。

public class WindowsSelectorProvider extends SelectorProviderImpl {

    public AbstractSelector openSelector() throws IOException {
        return new WindowsSelectorImpl(this);
    }
}
複製程式碼

這裡,我們需要來看一下WindowsSelectorImpl的建構函式:

//sun.nio.ch.WindowsSelectorImpl#WindowsSelectorImpl
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    pollWrapper = new PollArrayWrapper(INIT_CAP);
    wakeupPipe = Pipe.open();
    wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

    // Disable the Nagle algorithm so that the wakeup is more immediate
    SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
    (sink.sc).socket().setTcpNoDelay(true);
    wakeupSinkFd = ((SelChImpl)sink).getFDVal();

    pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
複製程式碼

我們由Pipe.open()就可知道selector會保持開啟的狀態,直到其呼叫它的close方法:

//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
    boolean open = selectorOpen.getAndSet(false);
    if (!open)
        return;
    implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
    wakeup();
    synchronized (this) {
        implClose();
        synchronized (publicSelectedKeys) {
            // Deregister channels
            Iterator<SelectionKey> i = keys.iterator();
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                deregister(ski);
                SelectableChannel selch = ski.channel();
                if (!selch.isOpen() && !selch.isRegistered())
                    ((SelChImpl)selch).kill();
                selectedKeys.remove(ski);
                i.remove();
            }
            assert selectedKeys.isEmpty() && keys.isEmpty();
        }
    }
}
//sun.nio.ch.WindowsSelectorImpl#implClose
@Override
protected void implClose() throws IOException {
    assert !isOpen();
    assert Thread.holdsLock(this);

    // prevent further wakeup
    synchronized (interruptLock) {
        interruptTriggered = true;
    }

    wakeupPipe.sink().close();
    wakeupPipe.source().close();
    pollWrapper.free();

    // Make all remaining helper threads exit
    for (SelectThread t: threads)
            t.makeZombie();
    startLock.startThreads();
}

複製程式碼

可以看到,前面的wakeupPipe在close方法中關閉掉了。這裡的close方法中又涉及了wakeupPipe.sink()wakeupPipe.source()的關閉與pollWrapper.free()的釋放,此處也是我們本篇的難點所在,這裡,我們來看看它們到底是什麼樣的存在。 首先,我們對WindowsSelectorImpl(SelectorProvider sp)這個建構函式做下梳理:

  • 建立一個PollArrayWrapper物件(pollWrapper);
  • Pipe.open()開啟一個管道;
  • 拿到wakeupSourceFdwakeupSinkFd兩個檔案描述符;
  • 把pipe內Source端的檔案描述符(wakeupSourceFd)放到pollWrapper裡;

Pipe.open()的解惑

這裡我們會有疑惑,為什麼要建立一個管道,它是用來做什麼的。

我們來看Pipe.open()原始碼實現:

//java.nio.channels.Pipe#open
public static Pipe open() throws IOException {
    return SelectorProvider.provider().openPipe();
}
//sun.nio.ch.SelectorProviderImpl#openPipe
public Pipe openPipe() throws IOException {
    return new PipeImpl(this);
}
//sun.nio.ch.PipeImpl#PipeImpl
PipeImpl(final SelectorProvider sp) throws IOException {
    try {
        AccessController.doPrivileged(new Initializer(sp));
    } catch (PrivilegedActionException x) {
        throw (IOException)x.getCause();
    }
}
private class Initializer
implements PrivilegedExceptionAction<Void>
{

private final SelectorProvider sp;

private IOException ioe = null;

private Initializer(SelectorProvider sp) {
    this.sp = sp;
}

@Override
public Void run() throws IOException {
    LoopbackConnector connector = new LoopbackConnector();
    connector.run();
    if (ioe instanceof ClosedByInterruptException) {
        ioe = null;
        Thread connThread = new Thread(connector) {
            @Override
            public void interrupt() {}
        };
        connThread.start();
        for (;;) {
            try {
                connThread.join();
                break;
            } catch (InterruptedException ex) {}
        }
        Thread.currentThread().interrupt();
    }

    if (ioe != null)
        throw new IOException("Unable to establish loopback connection", ioe);

    return null;
}
複製程式碼

從上述原始碼我們可以知道,建立了一個PipeImpl物件, 在PipeImpl的建構函式裡會執行AccessController.doPrivileged,在它呼叫後緊接著會執行Initializerrun方法:

//sun.nio.ch.PipeImpl.Initializer.LoopbackConnector
private class LoopbackConnector implements Runnable {

    @Override
    public void run() {
        ServerSocketChannel ssc = null;
        SocketChannel sc1 = null;
        SocketChannel sc2 = null;

        try {
            // Create secret with a backing array.
            ByteBuffer secret = ByteBuffer.allocate(NUM_SECRET_BYTES);
            ByteBuffer bb = ByteBuffer.allocate(NUM_SECRET_BYTES);

            // Loopback address
            InetAddress lb = InetAddress.getLoopbackAddress();
            assert(lb.isLoopbackAddress());
            InetSocketAddress sa = null;
            for(;;) {
                // Bind ServerSocketChannel to a port on the loopback
                // address
                if (ssc == null || !ssc.isOpen()) {
                    ssc = ServerSocketChannel.open();
                    ssc.socket().bind(new InetSocketAddress(lb, 0));
                    sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
                }

                // Establish connection (assume connections are eagerly
                // accepted)
                sc1 = SocketChannel.open(sa);
                RANDOM_NUMBER_GENERATOR.nextBytes(secret.array());
                do {
                    sc1.write(secret);
                } while (secret.hasRemaining());
                secret.rewind();

                // Get a connection and verify it is legitimate
                sc2 = ssc.accept();
                do {
                    sc2.read(bb);
                } while (bb.hasRemaining());
                bb.rewind();

                if (bb.equals(secret))
                    break;

                sc2.close();
                sc1.close();
            }

            // Create source and sink channels
            source = new SourceChannelImpl(sp, sc1);
            sink = new SinkChannelImpl(sp, sc2);
        } catch (IOException e) {
            try {
                if (sc1 != null)
                    sc1.close();
                if (sc2 != null)
                    sc2.close();
            } catch (IOException e2) {}
            ioe = e;
        } finally {
            try {
                if (ssc != null)
                    ssc.close();
            } catch (IOException e2) {}
        }
    }
}
}
複製程式碼

這裡即為建立pipe的過程,windows下的實現是建立兩個本地的socketChannel,然後連線(連線的過程通過寫一個隨機資料做兩個socket的連線校驗),兩個socketChannel分別實現了管道pipesourcesink端。 而我們依然不清楚這個pipe到底幹什麼用的, 假如大家熟悉系統呼叫的C/C++的話,就可以知道,一個阻塞在select上的執行緒有以下三種方式可以被喚醒:

  1. 有資料可讀/寫,或出現異常。
  2. 阻塞時間到,即time out
  3. 收到一個non-block的訊號。可由killpthread_kill發出。

所以,Selector.wakeup()要喚醒阻塞的select,那麼也只能通過這三種方法,其中:

  • 第二種方法可以排除,因為select一旦阻塞,無法修改其time out時間。
  • 而第三種看來只能在Linux上實現,Windows上沒有這種訊號通知的機制。

看來只有第一種方法了。假如我們多次呼叫Selector.open(),那麼在Windows上會每呼叫一次,就會建立一對自己和自己的loopbackTCP連線;在Linux上的話,每呼叫一次,會開一對pipe(pipe在Linux下一般都成對開啟),到這裡,估計我們能夠猜得出來——那就是如果想要喚醒select,只需要朝著自己的這個loopback連線發點資料過去,於是,就可以喚醒阻塞在select上的執行緒了。

我們對上面所述做下總結:在Windows下,Java虛擬機器在Selector.open()時會自己和自己建立loopbackTCP連線;在Linux下,Selector會建立pipe。這主要是為了Selector.wakeup()可以方便喚醒阻塞在select()系統呼叫上的執行緒(通過向自己所建立的TCP連結和管道上隨便寫點什麼就可以喚醒阻塞執行緒)。

PollArrayWrapper解讀

WindowsSelectorImpl構造器最後,我們看到這一句程式碼:pollWrapper.addWakeupSocket(wakeupSourceFd, 0);,即把pipe內Source端的檔案描述符(wakeupSourceFd)放到pollWrapper裡。pollWrapper作為PollArrayWrapper的例項,它到底是什麼,這一節,我們就來對其探索一番。

class PollArrayWrapper {

    private AllocatedNativeObject pollArray; // The fd array

    long pollArrayAddress; // pollArrayAddress

    @Native private static final short FD_OFFSET     = 0; // fd offset in pollfd
    @Native private static final short EVENT_OFFSET  = 4; // events offset in pollfd

    static short SIZE_POLLFD = 8; // sizeof pollfd struct

    private int size; // Size of the pollArray

    PollArrayWrapper(int newSize) {
        int allocationSize = newSize * SIZE_POLLFD;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();
        this.size = newSize;
    }

    ...

    // Access methods for fd structures
    void putDescriptor(int i, int fd) {
        pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
    }

    void putEventOps(int i, int event) {
        pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
    }
    ...
   // Adds Windows wakeup socket at a given index.
    void addWakeupSocket(int fdVal, int index) {
        putDescriptor(index, fdVal);
        putEventOps(index, Net.POLLIN);
    }
}
複製程式碼

這裡將wakeupSourceFdPOLLIN事件標識為pollArrayEventOps的對應的值,這裡使用的是unsafe直接操作的記憶體,也就是相對於這個pollArray所在記憶體地址的偏移量SIZE_POLLFD * i + EVENT_OFFSET這個位置上寫入Net.POLLIN所代表的值,即參考下面本地方法相關原始碼所展示的值。putDescriptor同樣是這種類似操作。當sink端有資料寫入時,source對應的檔案描述符wakeupSourceFd就會處於就緒狀態。

//java.base/windows/native/libnio/ch/nio_util.h
    /* WSAPoll()/WSAPOLLFD and the corresponding constants are only defined   */
    /* in Windows Vista / Windows Server 2008 and later. If we are on an      */
    /* older release we just use the Solaris constants as this was previously */
    /* done in PollArrayWrapper.java.                                         */
    #define POLLIN       0x0001
    #define POLLOUT      0x0004
    #define POLLERR      0x0008
    #define POLLHUP      0x0010
    #define POLLNVAL     0x0020
    #define POLLCONN     0x0002
複製程式碼

AllocatedNativeObject這個類的父類有大量的unsafe類的操作,這些都是直接基於記憶體級別的操作。從其父類的構造器中,我們能也清楚的看到pollArray是通過unsafe.allocateMemory(size + ps)分配的一塊系統記憶體。

class AllocatedNativeObject                             // package-private
    extends NativeObject
{
    /**
     * Allocates a memory area of at least {@code size} bytes outside of the
     * Java heap and creates a native object for that area.
     */
    AllocatedNativeObject(int size, boolean pageAligned) {
        super(size, pageAligned);
    }

    /**
     * Frees the native memory area associated with this object.
     */
    synchronized void free() {
        if (allocationAddress != 0) {
            unsafe.freeMemory(allocationAddress);
            allocationAddress = 0;
        }
    }

}
//sun.nio.ch.NativeObject#NativeObject(int, boolean)
protected NativeObject(int size, boolean pageAligned) {
        if (!pageAligned) {
            this.allocationAddress = unsafe.allocateMemory(size);
            this.address = this.allocationAddress;
        } else {
            int ps = pageSize();
            long a = unsafe.allocateMemory(size + ps);
            this.allocationAddress = a;
            this.address = a + ps - (a & (ps - 1));
        }
    }
複製程式碼

至此,我們算是完成了對Selector.open()的解讀,其主要任務就是完成建立Pipe,並把pipe source端的wakeupSourceFd放入pollArray中,這個pollArraySelector完成其角色任務的樞紐。本篇主要圍繞Windows的實現來進行分析,即在windows下通過兩個連線的socketChannel實現了Pipelinux下則直接使用系統的pipe即可。

SelectionKey在selector中的管理

SelectionKey在selector中註冊

所謂的註冊,其實就是將一個物件放到註冊地物件內的一個容器欄位上,這個欄位可以是陣列,佇列,也可以是一個set集合,也可以是一個list。這裡,同樣是這樣,只不過,其需要有個返回值,那麼把這個要放入集合的物件返回即可。

//sun.nio.ch.SelectorImpl#register
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
                                        int ops,
                                        Object attachment)
{
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);

    // register (if needed) before adding to key set
    implRegister(k);

    // add to the selector's key set, removing it immediately if the selector
    // is closed. The key is not in the channel's key set at this point but
    // it may be observed by a thread iterating over the selector's key set.
    keys.add(k);
    try {
        k.interestOps(ops);
    } catch (ClosedSelectorException e) {
        assert ch.keyFor(this) == null;
        keys.remove(k);
        k.cancel();
        throw e;
    }
    return k;
}
//sun.nio.ch.WindowsSelectorImpl#implRegister
@Override
protected void implRegister(SelectionKeyImpl ski) {
    ensureOpen();
    synchronized (updateLock) {
        newKeys.addLast(ski);
    }
}
複製程式碼

這段程式碼我們之前已經有看過,這裡我們再次溫習下。 首先會新建一個SelectionKeyImpl物件,這個物件就是對Channel的包裝,不僅如此,還順帶把當前這個Selector物件給收了進去,這樣,我們也可以通過SelectionKey的物件來拿到其對應的Selector物件。

接著,基於windows平臺實現的implRegister,先通過ensureOpen()來確保該Selector是開啟的。接著將這個SelectionKeyImpl加入到WindowsSelectorImpl內針對於新註冊SelectionKey進行管理的newKeys之中,newKeys是一個ArrayDeque物件。對於ArrayDeque有不懂的,可以參考Java 容器原始碼分析之 Deque 與 ArrayDeque這篇文章。

然後再將此這個SelectionKeyImpl加入到sun.nio.ch.SelectorImpl#keys中去,這個Set<SelectionKey>集合代表那些已經註冊到當前這個Selector物件上的SelectionKey集合。我們來看sun.nio.ch.SelectorImpl的建構函式:

//sun.nio.ch.SelectorImpl#SelectorImpl
protected SelectorImpl(SelectorProvider sp) {
    super(sp);
    keys = ConcurrentHashMap.newKeySet();
    selectedKeys = new HashSet<>();
    publicKeys = Collections.unmodifiableSet(keys);
    publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
複製程式碼

也就是說,這裡的publicKeys就來源於keys,只是publicKeys屬於只讀的,我們想要知道當前Selector物件上所註冊的keys,就可以呼叫sun.nio.ch.SelectorImpl#keys來得到:

//sun.nio.ch.SelectorImpl#keys
@Override
public final Set<SelectionKey> keys() {
    ensureOpen();
    return publicKeys;
}
複製程式碼

再回到這個建構函式中,selectedKeys,顧名思義,其屬於已選擇Keys,即前一次操作期間,已經準備就緒的Channel所對應的SelectionKey。此集合為keys的子集。通過selector.selectedKeys()獲取。

//sun.nio.ch.SelectorImpl#selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
    ensureOpen();
    return publicSelectedKeys;
}
複製程式碼

我們看到其返回的是publicSelectedKeys,針對這個欄位裡的元素操作可以做刪除,但不能做增加。 在前面的內容中,我們有涉及到SelectionKey的取消,所以,我們在java.nio.channels.spi.AbstractSelector方法內,是有定義cancelledKeys的,也是一個HashSet物件。其代表已經被取消但尚未取消註冊(deregister)的SelectionKey。此Set集合無法直接訪問,同樣,它也是keys()的子集。

對於新的Selector例項,上面幾個集合均為空。由上面展示的原始碼可知,通過channel.registerSelectionKey新增keys中,此為key的來源。 如果某個selectionKey.cancel()被呼叫,那麼此key將會被新增到cancelledKeys這個集合中,然後在下一次呼叫selector select方法期間,此時canceldKeys不為空,將會觸發此SelectionKeyderegister操作(釋放資源,並從keys中移除)。無論通過channel.close()還是通過selectionKey.cancel(),都會導致SelectionKey被加入到cannceldKey中.

每次選擇操作(select)期間,都可以將key新增到selectedKeys中或者將從cancelledKeys中移除。

Selector的select方法的解讀

瞭解了上面的這些,我們來進入到select方法中,觀察下它的細節。由Selector的api可知,select操作有兩種形式,一種為 select(),selectNow(),select(long timeout);另一種為select(Consumer<SelectionKey> action, long timeout)select(Consumer<SelectionKey> action)selectNow(Consumer<SelectionKey> action)。後者為JDK11新加入的api,主要針對那些準備好進行I/O操作的channels在select過程中對相應的key進行的一個字的自定義的一個操作。 需要注意的是,有Consumer<SelectionKey> action引數的select操作是阻塞的,只有在選擇了至少一個Channel的情況下,才會呼叫此Selector例項的wakeup方法來喚醒,同樣,其所線上程被打斷也可以。

//sun.nio.ch.SelectorImpl
@Override
public final int select(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}

//sun.nio.ch.SelectorImpl
@Override
public final int select(Consumer<SelectionKey> action, long timeout)
    throws IOException
{
    Objects.requireNonNull(action);
    if (timeout < 0)
        throw new IllegalArgumentException("Negative timeout");
    return lockAndDoSelect(action, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
    {
        synchronized (this) {
            ensureOpen();
            if (inSelect)
                throw new IllegalStateException("select in progress");
            inSelect = true;
            try {
                synchronized (publicSelectedKeys) {
                    return doSelect(action, timeout);
                }
            } finally {
                inSelect = false;
            }
        }
    }

複製程式碼

我們可以觀察,無論哪種,它們最後都落在了lockAndDoSelect這個方法上,最終會執行特定系統上的doSelect(action, timeout)實現。 這裡我們以sun.nio.ch.WindowsSelectorImpl#doSelect為例來講述其操作執行的步驟:

// sun.nio.ch.WindowsSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
    throws IOException
    {
        assert Thread.holdsLock(this);
        this.timeout = timeout; // set selector timeout
        processUpdateQueue();  // <1>
        processDeregisterQueue(); // <2>
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        // Calculate number of helper threads needed for poll. If necessary
        // threads are created here and start waiting on startLock
        adjustThreadsCount();
        finishLock.reset(); // reset finishLock
        // Wakeup helper threads, waiting on startLock, so they start polling.
        // Redundant threads will exit here after wakeup.
        startLock.startThreads();
        // do polling in the main thread. Main thread is responsible for
        // first MAX_SELECTABLE_FDS entries in pollArray.
        try {
            begin();
            try {
                subSelector.poll();  // <3>
            } catch (IOException e) {
                finishLock.setException(e); // Save this exception
            }
            // Main thread is out of poll(). Wakeup others and wait for them
            if (threads.size() > 0)
                finishLock.waitForHelperThreads();
          } finally {
              end();
          }
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        finishLock.checkForException();
        processDeregisterQueue();  // <4>
        int updated = updateSelectedKeys(action); // <5>
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        resetWakeupSocket(); // <6>
        return updated;
    }
複製程式碼

processUpdateQueue解讀

  1. 首先通過相應作業系統實現類(此處是WindowsSelectorImpl)的具體實現我們可以知道,通過<1> 處的 processUpdateQueue()獲得關於每個剩餘Channel(有些Channel取消了)的在此刻的interestOps,這裡包括新註冊的和updateKeys,並對其進行pollWrapper的管理操作。

    • 即對於新註冊的SelectionKeyImpl,我們在相對於這個pollArray所在記憶體地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSETSIZE_POLLFD * totalChannels + EVENT_OFFSET分別存入SelectionKeyImpl的檔案描述符fd與其對應的EventOps(初始為0)。

    • updateKeys,因為是其之前已經在pollArray的某個相對位置上儲存過,這裡我們還需要對拿到的key的有效性進行判斷,如果有效,只需要將正在操作的這個SelectionKeyImpl物件的interestOps寫入到在pollWrapper中的存放它的EventOps位置上。

    注意: 在對newKeys進行key的有效性判斷之後,如果有效,會呼叫growIfNeeded()方法,這裡首先會判斷channelArray.length == totalChannels,此為一個SelectionKeyImpl的陣列,初始容量大小為8。channelArray其實就是方便Selector管理在冊SelectionKeyImpl數量的一個數組而已,通過判斷它的陣列長度大小,如果和totalChannels(初始值為1)相等,不僅僅是為了channelArray擴容,更重要的是為了輔助pollWrapper,讓pollWrapper擴容才是這裡的目的所在。 而當totalChannels % MAX_SELECTABLE_FDS == 0時,則多開一個執行緒處理selectorwindowsselect系統呼叫有最大檔案描述符限制,一次只能輪詢1024個檔案描述符,如果多於1024個,需要多執行緒進行輪詢。通過ski.setIndex(totalChannels)選擇鍵記錄下在陣列中的索引位置SelectionKeyImpl選擇鍵的對映關係,以待後續使用。同時呼叫pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels)在相對於這個pollArray所在記憶體地址的偏移量SIZE_POLLFD * totalChannels + FD_OFFSET這個位置上寫入wakeupSourceFd所代表的fdVal值。這樣在新起的執行緒就可以通過MAX_SELECTABLE_FDS來確定這個用來監控的wakeupSourceFd

   /**
    * sun.nio.ch.WindowsSelectorImpl#processUpdateQueue
    * Process new registrations and changes to the interest ops.
    */
private void processUpdateQueue() {
    assert Thread.holdsLock(this);

    synchronized (updateLock) {
        SelectionKeyImpl ski;

        // new registrations
        while ((ski = newKeys.pollFirst()) != null) {
            if (ski.isValid()) {
                growIfNeeded();
                channelArray[totalChannels] = ski;
                ski.setIndex(totalChannels);
                pollWrapper.putEntry(totalChannels, ski);
                totalChannels++;
                MapEntry previous = fdMap.put(ski);
                assert previous == null;
            }
        }

        // changes to interest ops
        while ((ski