1. 程式人生 > >Java NIO之選擇器

Java NIO之選擇器

1.簡介

前面的文章說了緩衝區,說了通道,本文就來說說 NIO 中另一個重要的實現,即選擇器 Selector。在更早的文章中,我簡述了幾種 IO 模型。如果大家看過之前的文章,並動手寫過程式碼的話。再看 Java 的選擇器大概就會知道它是什麼了,以及怎麼用了。選擇器是 Java 多路複用模型的一個實現,可以同時監控多個非阻塞套接字通道。示意圖大致如下:

如果大家瞭解過多路複用模型,那應該也會知道幾種複用模型的實現。比如 select,poll 以及 Linux 下的 epoll 和 BSD 下的 kqueue。Java 的選擇器並非憑空創造,而是在底層作業系統提供的介面的基礎上封裝而來。相關的細節,我隨後會進行分析。

關於 Java 選擇器的簡介這裡先說到這,接下來進入正題。

 2.基本操作及實現

本章我將對 Selector 的建立,通道的註冊,Selector 的選擇過程進行分析。內容篇幅較大,希望大家耐心看完。由於 Selector 相關類在不同作業系統下的實現是不同的,加之個人對 Linux epoll 更為熟悉,所以本文所分析的原始碼也是和 epoll 相關的。好了,進入正題吧。

 2.1 建立選擇器

選擇器 Selector 是一個抽象類,所以不能直接建立。Selector 提供了一個 open 方法,通過 open 方法既可以建立選擇器例項。示例程式碼如下:

1
Selector selector = Selector.open();

上面的程式碼比較簡單,只有一行。不過不要被表象迷惑,這行程式碼僅是完整實現的冰山一角,更復雜的邏輯則隱藏在水面之下。
在簡介一節,我已經說了 Java 選擇器是對底層多路複用介面的一個包裝,這裡的 open 方法也不例外。假設我們的 Java 執行在 Linux 平臺下,那麼 open 最終所做的事情應該是呼叫作業系統的epoll_create函式,用於建立 epoll 例項。真實情況是不是如此呢?答案就在冰山深處,接下來就讓我們一起去求索吧。下面我們將沿著 open 方法一路走下去,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
public abstract class Selector implements Closeable {
    public static Selector open() throws IOException {
        // 建立 SelectorProvider,再通過其 openSelector 方法建立 Selector
        return SelectorProvider.provider().openSelector();
    }
    // 省略無關程式碼
}

public abstract class SelectorProvider {
    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            // 建立預設的 SelectorProvider
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }
}

public class DefaultSelectorProvider {
    private DefaultSelectorProvider() { }
    
    /**
     * 根據系統名稱建立相應的 SelectorProvider
     */
    public static SelectorProvider create() {
        String osname = AccessController
            .doPrivileged(new GetPropertyAction("os.name"));
        if (osname.equals("SunOS"))
            return createProvider("sun.nio.ch.DevPollSelectorProvider");
        if (osname.equals("Linux"))
            return createProvider("sun.nio.ch.EPollSelectorProvider");
        
        // 
        return new sun.nio.ch.PollSelectorProvider();
    }
    
    /**
     * 載入 SelectorProvider 類,並建立例項
     */
    @SuppressWarnings("unchecked")
    private static SelectorProvider createProvider(String cn) {
        Class<SelectorProvider> c;
        try {
            c = (Class<SelectorProvider>)Class.forName(cn);
        } catch (ClassNotFoundException x) {
            throw new AssertionError(x);
        }
        try {
            return c.newInstance();
        } catch (IllegalAccessException | InstantiationException x) {
            throw new AssertionError(x);
        }

    }
}

/**
 * 建立完 SelectorProvider,接下來要呼叫 openSelector 方法
 * 建立 Selector 的繼承類了。
 */
public class EPollSelectorProvider extends SelectorProviderImpl {
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }
}

class EPollSelectorImpl extends SelectorImpl {
    EPollSelectorImpl(SelectorProvider sp) throws IOException {
        // 呼叫父類構造方法
        super(sp);
        long pipeFds = IOUtil.makePipe(false);
        fd0 = (int) (pipeFds >>> 32);
        fd1 = (int) pipeFds;
        
        // 建立 EPollArrayWrapper,EPollArrayWrapper 是一個重要的實現
        pollWrapper = new EPollArrayWrapper();
        
        pollWrapper.initInterrupt(fd0, fd1);
        fdToKey = new HashMap<>();
    }
}

public abstract class SelectorImpl extends AbstractSelector {
    protected SelectorImpl(SelectorProvider sp) {
        super(sp);
        keys = new HashSet<SelectionKey>();
        selectedKeys = new HashSet<SelectionKey>();
        
        /* 初始化 publicKeys 和 publicSelectedKeys,
         * publicKeys 即 selector.keys() 方法所返回的集合,
         * publicSelectedKeys 則是 selector.selectedKeys() 方法返回的集合
         */
        if (Util.atBugLevel("1.4")) {
            publicKeys = keys;
            publicSelectedKeys = selectedKeys;
        } else {
            publicKeys = Collections.unmodifiableSet(keys);
            publicSelectedKeys = Util.ungrowableSet(selectedKeys);
        }
    }
}

/**
 * EPollArrayWrapper 一個重要的實現,這一層再往下就是 C 程式碼了
 */
class EPollArrayWrapper {
    EPollArrayWrapper() throws IOException {
        // 呼叫 epollCreate 方法建立 epoll 檔案描述符
        epfd = epollCreate();
    
        // the epoll_event array passed to epoll_wait
        // 初始化 pollArray,該物件用於儲存就緒檔案描述符和事件
        int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();
    
        // eventHigh needed when using file descriptors > 64k
        if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
            eventsHigh = new HashMap<>();
    }
    
    // epollCreate 方法是 native 型別的
    private native int epollCreate();
}

以上程式碼時 Java 層面的,Java 層呼叫棧最下面的類是 EPollArrayWrapper(原始碼路徑可以在附錄中查詢)。EPollArrayWrapper 是一個重要的實現,起著承上啟下的作用。上層是 Java 程式碼,下層是 C 程式碼。上層的程式碼看完了,接下來看看冰山深處的 C 程式碼:

1
2
3
4
5
6
7
8
9
10
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
    // 呼叫 epoll_create 函式建立 epoll 例項,並返回檔案描述符 epfd
    int epfd = epoll_create(256);
    if (epfd < 0) {
       JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}

上面的程式碼很簡單,僅做了建立 epoll 例項這一件事。看到這裡,答案就明瞭了。最後在附一張時序圖幫助大家理清程式碼呼叫順序,如下:

Selector.open

 2.2 選擇鍵

 2.2.1 幾種事件

選擇鍵 SelectionKey 包含4種事件,分別是:

1
2
3
4
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

事件之間可以通過或運算進行組合,比如:

1
int interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

 2.2.2 兩種事件集合:interestOps 和 readyOps

interestOps 即感興趣的事件集合,通道呼叫 register 方法註冊時會設定此值,interestOps 可通過 SelectionKey interestOps() 方法獲取。readyOps 是就緒事件集合,可通過 SelectionKey readyOps() 獲取。

interestOps 和 readyOps 被宣告在 SelectionKey 子類 SelectionKeyImpl 中,程式碼如下:

1
2
3
4
public class SelectionKeyImpl extends AbstractSelectionKey {
    private volatile int interestOps;
    private int readyOps;
}

接下來再來看看與 readyOps 事件集合相關的幾個方法,如下:

1
2
3
4
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

以上方法從字面意思上就可以知道有什麼用,這裡就不解釋了。接下來以 isReadable 方法為例,簡單看一下這個方法是如何實現。

1
2
3
public final boolean isReadable() {
    return (readyOps() & OP_READ) != 0;
}

上面說到可以通過或運算組合事件,這裡則是通過與運算來測試某個事件是否在事件集合中。比如

1
2
3
readyOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE = 0101,
readyOps & OP_READ = 0101 & 0001 = 0001,
readyOps & OP_CONNECT = 0101 & 1000 = 0

readyOps & OP_READ != 0,所以 OP_READ 在事件集合中。readyOps & OP_CONNECT == 0,所以 OP_CONNECT 不在事件集合中。

 2.2.3 attach 方法

attach 是一個好用的方法,通過這個方法,可以將物件暫存在 SelectionKey 中,待需要的時候直接取出來即可。比如本文對應的練習程式碼實現了一個簡單的 HTTP 伺服器,在讀取使用者請求資料後(即 selectionKey.isReadable() 為 true),會去解析請求頭,然後將請求頭資訊通過 attach 方法放入 selectionKey 中。待通道可寫後,再從 selectionKey 中取出請求頭,並根據請求頭回復客戶端不同的訊息。當然,這只是一個應用場景,attach 可能還有其他的應用場景,比如標識通道。不過其他的場景我沒使用過,就不說了。attach 使用方式如下:

1
2
selectionKey.attach(obj);
Object attachedObj = selectionKey.attachment();

 2.3 通道註冊

通道註冊即將感興趣的事件告知 Selector,待事件發生時,Selector 即可返回就緒事件,我們就可以去做後續的事情了。比如 ServerSocketChannel 通道通常對 OP_ACCEPT 事件感興趣,那麼我們就可以把這個事件註冊給 Selector。待事件發生,即服務端接受客戶端連線後,我們即可獲取這個就緒的事件並做相應的操作。通道註冊的示例程式碼如下:

1
2
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

起初我以為通道註冊操作會呼叫作業系統的 epoll_ctl 函式,但最終通過看原始碼,發現自己的理解是錯的。既然通道註冊階段不呼叫 epoll_ctl 函式。那麼,epoll_ctl 什麼時候才會被呼叫呢?如果不呼叫 epoll_ctl,那麼註冊過程都幹了什麼事情呢?關於第一個問題,本節還無法解答,不過第二個問題則可以說說。接下來讓我們深入通道類 register 方法的呼叫棧中去探尋答案吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
    public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {
        return register(sel, ops, null);
    }
    
    public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
}

public abstract class AbstractSelectableChannel extends SelectableChannel {
    
    private SelectionKey[] keys = null;
    
    public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
        synchronized (regLock) {
            // 省去一些校驗程式碼
            
            // 從 keys 陣列中查詢,查詢條件為 k.selector() == sel
            SelectionKey k = findKey(sel);
            
            // 如果 k 不為空,則修改 k 所感興趣的事件
            if (k != null) {
                k.interestOps(ops);
                k.attach(att);
            }
            
            // k 為空,則建立一個 SelectionKey,並存儲到 keys 陣列中
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }
}

public abstract class AbstractSelector extends Selector {
    protected abstract SelectionKey register(AbstractSelectableChannel ch,
                                         int ops, Object att);
}

public abstract class SelectorImpl extends AbstractSelector {
    protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
        if (!(ch instanceof SelChImpl))
            throw new IllegalSelectorException();
        // 建立 SelectionKeyImpl 例項
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        k.attach(attachment);
        synchronized (publicKeys) {
            implRegister(k);
        }
        k.interestOps(ops);
        return k;
    }
}

class EPollSelectorImpl extends SelectorImpl {
    protected void implRegister(SelectionKeyImpl ski) {
        if (closed)
            throw new ClosedSelectorException();
        SelChImpl ch = ski.channel;
        int fd = Integer.valueOf(ch.getFDVal());
        // 儲存 fd 和 SelectionKeyImpl 的對映關係
        fdToKey.put(fd, ski);
        
        pollWrapper.add(fd);
        // 將 SelectionKeyImpl 例項儲存到 keys 中(這裡的 keys 宣告在 SelectorImpl 類中),keys 集合可由 selector.keys() 方法獲取
        keys.add(ski);
    }
}

public class SelectionKeyImpl extends AbstractSelectionKey {
    public SelectionKey interestOps(int ops) {
        ensureValid();
        return nioInterestOps(ops);
    }
    
    public SelectionKey nioInterestOps(int ops) {
        if ((ops & ~channel().validOps()) != 0)
            throw new IllegalArgumentException();
        // 轉換並設定感興趣的事件
        channel.translateAndSetInterestOps(ops, this);
        // 設定 interestOps 變數
        interestOps = ops;
        return this;
    }
}

class SocketChannelImpl extends SocketChannel implements SelChImpl {
    public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
        int newOps = 0;
        // 轉換事件
        if ((ops & SelectionKey.OP_READ) != 0)
            newOps |= PollArrayWrapper.POLLIN;
        if ((ops & SelectionKey.OP_WRITE) != 0)
            newOps |= PollArrayWrapper.POLLOUT;
        if ((ops & SelectionKey.OP_CONNECT) != 0)
            newOps |= PollArrayWrapper.POLLCONN;
        // 設定事件
        sk.selector.putEventOps(sk, newOps);
    }
}

class class EPollSelectorImpl extends SelectorImpl {
    public void putEventOps(SelectionKeyImpl ski, int ops) {
        if (closed)
            throw new ClosedSelectorException();
        SelChImpl ch = ski.channel;
        // 設定感興趣的事件
        pollWrapper.setInterest(ch.getFDVal(), ops);
    }
}

class EPollArrayWrapper {
    void setInterest(int fd, int mask) {
        synchronized (updateLock) {
            // 擴容 updateDescriptors 陣列,並存儲檔案描述符 fd
            int oldCapacity = updateDescriptors.length;
            if (updateCount == oldCapacity) {
                int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
                int[] newDescriptors = new int[newCapacity];
                System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
                updateDescriptors = newDescriptors;
            }
            updateDescriptors[updateCount++] = fd;
    
            // events are stored as bytes for efficiency reasons
            byte b = (byte)mask;
            assert (b == mask) && (b != KILLED);
            // 儲存事件
            setUpdateEvents(fd, b, false);
        }
    }
    
    private void setUpdateEvents(int fd, byte events, boolean force) {
        if (fd < MAX_UPDATE_ARRAY_SIZE) {
            if ((eventsLow[fd] != KILLED) || force) {
                eventsLow[fd] = events;
            }
        } else {
            Integer key = Integer.valueOf(fd);
            if (!isEventsHighKilled(key) || force) {
                eventsHigh.put(key, Byte.valueOf(events));
            }
        }
    }
}

到 setUpdateEvents 這個方法,整個呼叫棧就結束了。但是我們並未在呼叫棧中看到呼叫 epoll_ctl 函式的地方,也就是說,通道註冊時,並不會立即呼叫 epoll_ctl,而是先將事件集合 events 存放在 eventsLow。至於 epoll_ctl 函式何時呼叫的,需要大家繼續往下看了。

 2.4 選擇過程

 2.4.1 選擇方法

Selector 包含3種不同功能的選擇方法,分別如下:

  • int select()
  • int select(long timeout)
  • int selectNow()

select() 是一個阻塞方法,僅在至少一個通道處於就緒狀態時才返回。
select(long timeout) 同樣也是阻塞方法,不過可對該方法設定超時時間(timeout > 0),使得執行緒不會被一直阻塞。如果 timeout = 0,會一直阻塞執行緒。
selectNow() 為非阻塞方法,呼叫後立即返回。

以上3個方法均返回 int 型別值,表示每次呼叫 select 或 selectNow 方法後,新就緒通道的數量。如果某個通道在上一次呼叫 select 方法時就已經處於就緒狀態,但並未將該通道對應的 SelectionKey 物件從 selectedKeys 集合中移除。假設另一個的通道在本次呼叫 select 期間處於就緒狀態,此時,select 返回1,而不是2。

 2.4.2 選擇過程

選擇方法用起來雖然簡單,但方法之下隱藏的邏輯還是比較複雜的。大致分為下面幾個步驟:

  1. 檢查已取消鍵集合 cancelledKeys 是否為空,不為空則將 cancelledKeys 的鍵從 keys 和 selectedKeys 中移除,並將鍵和通道登出。
  2. 呼叫作業系統的 epoll_ctl 函式將通道感興趣的事件註冊到 epoll 例項中
  3. 呼叫作業系統的 epoll_wait 函式監聽事件
  4. 再次執行步驟1
  5. 更新 selectedKeys 集合,並返回就緒通道數量

上面五個步驟對應於 EPollSelectorImpl 類中 doSelect 方法的邏輯,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
protected int doSelect(long timeout) throws IOException {
    if (closed)
        throw new ClosedSelectorException();
    // 處理已取消鍵集合,對應步驟1
    processDeregisterQueue();
    try {
        begin();
        // select 方法的核心,對應步驟2和3
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    // 處理已取消鍵集合,對應步驟4
    processDeregisterQueue();
    
    // 更新 selectedKeys 集合,並返回就緒通道數量,對應步驟5
    int numKeysUpdated = updateSelectedKeys();
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}

接下來,我們按照上面的步驟順序去分析程式碼實現。先來看看步驟1對應的程式碼:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
+----SelectorImpl.java
void processDeregisterQueue() throws IOException {
    // Precondition: Synchronized on this, keys, and selectedKeys
    Set<SelectionKey> cks = cancelledKeys();
    synchronized (cks) {
        if (!cks.isEmpty()) {
            Iterator<SelectionKey> i = cks.iterator();
            // 遍歷 cancelledKeys,執行登出操作
            while (i.hasNext()) {
                SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                try {
                    // 執行登出邏輯
                    implDereg(ski);
                } catch (SocketException se) {
                    throw new IOException("Error deregistering key", se);
                } finally {
                    i.remove();
                }
            }
        }
    }
}

+----EPollSelectorImpl.java
protected void implDereg(SelectionKeyImpl ski) throws IOException {
    assert (ski.getIndex() >= 0);
    SelChImpl ch = ski.channel;
    int fd = ch.getFDVal();
    // 移除 fd 和選擇鍵鍵的對映關係
    fdToKey.remove(Integer.valueOf(fd));
    // 從 epoll 例項中刪除事件
    pollWrapper.remove(fd);
    ski.setIndex(-1);
    
    // 從 keys 和 selectedKeys 中移除選擇鍵
    keys.remove(ski);
    selectedKeys.remove(ski);
    
    // 登出選擇鍵
    deregister((AbstractSelectionKey)ski);
    
    // 登出通道
    SelectableChannel selch = ski.channel();
    if (!selch.isOpen() && !selch.isRegistered())
        ((SelChImpl)selch).kill();
}

上面的程式碼程式碼邏輯不是很複雜,首先是獲取 cancelledKeys 集合,然後遍歷集合,並對每個選擇鍵及其對應的通道執行登出操作。接下來再來看看步驟2和3對應的程式碼,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
+----EPollArrayWrapper.java
int poll(long timeout) throws IOException {
    // 呼叫 epoll_ctl 函式註冊事件,對應步驟3
    updateRegistrations();
    
    // 呼叫 epoll_wait 函式等待事件發生,對應步驟4
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}

/**
 * Update the pending registrations.
 */
private void updateRegistrations() {
    synchronized (updateLock) {
        int j = 0;
        while (j < updateCount) {
            // 獲取 fd 和 events,這兩個值在呼叫 register 方法時被儲存到陣列中
            int fd = updateDescriptors[j];
            short events = getUpdateEvents(fd);
            boolean isRegistered = registered.get(fd);
            int opcode = 0;

            if (events != KILLED) {
                // 確定 opcode 的值
                if (isRegistered) {
                    opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                } else {
                    opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                }
                if (opcode != 0) {
                    // 註冊事件
                    epollCtl(epfd, opcode, fd, events);
                    // 設定 fd 的註冊狀態
                    if (opcode == EPOLL_CTL_ADD) {
                        registered.set(fd);
                    } else if (opcode == EPOLL_CTL_DEL) {
                        registered.clear(fd);
                    }
                }
            }
            j++;
        }
        updateCount = 0;
    }
    
    // 下面兩個均是 native 方法
    private native void epollCtl(int epfd, int opcode, int fd, int events);
    private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
}

看到 updateRegistrations 方法的實現,大家現在知道 epoll_ctl 這個函式是在哪裡呼叫的了。在 3.2 節通道註冊的結尾給大家埋了一個疑問,這裡就是答案了。註冊通道實際上只是先將事件收集起來,等呼叫 select 方法時,在一起通過 epoll_ctl 函式將事件註冊到 epoll 例項中。

上面 epollCtl 和 epollWait 方法是 native 型別的,接下來我們再來看看這兩個方法是如何實現的。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
+----EPollArrayWrapper.c
JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) {
    struct epoll_event event;
    int res;

    event.events = events;
    event.data.fd = fd;

    // 呼叫 epoll_ctl 註冊事件
    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

    if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
    }
}

JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) {
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
        // 呼叫 epoll_wait 等待事件
        RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout);
    }

    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}

上面的C程式碼沒什麼複雜的邏輯,這裡就不多說了。如果大家對 epoll_ctl 和 epoll_wait 函式不瞭解,可以參考 Linux man-page。關於 epoll 的示例,也可以參考我的另一篇文章“基於epoll實現簡單的web伺服器”

說完步驟2和3對應的程式碼,接下來再來說說步驟4和5。由於步驟4和步驟1是一樣的,這裡不再贅述。最後再來說說步驟5的邏輯。程式碼如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
+----EPollSelectorImpl.java
private int updateSelectedKeys() {
    int entries = pollWrapper.updated;
    int numKeysUpdated = 0;
    for (int i=0; i<entries; i++) {
        /* 從 pollWrapper 成員變數的 pollArray 中獲取檔案描述符,
         * pollArray 中的資料由 epoll_wait 設定
         */
        int nextFD = pollWrapper.getDescriptor(i);
        SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
        // ski is null in the case of an interrupt
        if (ski != null) {
            // 從 pollArray 中獲取就緒事件集合
            int rOps = pollWrapper.getEventOps(i);
            
            /* 如果 selectedKeys 已包含選擇鍵,則選擇鍵必須由新的事件發生時,
             * 才會將 numKeysUpdated + 1
             */ 
            if (selectedKeys.contains(ski)) {
                if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                    numKeysUpdated++;
                }
            } else {
                // 轉換並設定就緒事件集合
                ski.channel.translateAndSetReadyOps(rOps, ski);
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    // 更新 selectedKeys 集合,並將 numKeysUpdated + 1
                    selectedKeys.add(ski);
                    numKeysUpdated++;
                }
            }
        }
    }
    
    // 返回 numKeysUpdated
    return numKeysUpdated;
}

+----SocketChannelImpl.java
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
    int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
    int oldOps = sk.nioReadyOps();
    int newOps = initialOps;

    if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
        return false;
    }

    if ((ops & (PollArrayWrapper.POLLERR
                | PollArrayWrapper.POLLHUP)) != 0) {
        newOps = intOps;
        sk.nioReadyOps(newOps);
        // No need to poll again in checkConnect,
        // the error will be detected there
        readyToConnect = true;
        return (newOps & ~oldOps) != 0;
    }

    /* 
     * 轉換事件
     */
    if (((ops & PollArrayWrapper.POLLIN) != 0) &&
        ((intOps & SelectionKey.OP_READ) != 0) &&
        (state == ST_CONNECTED))
        newOps |= SelectionKey.OP_READ;

    if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
        ((intOps & SelectionKey.OP_CONNECT) != 0) &&
        ((state == ST_UNCONNECTED) || (state == ST_PENDING))) {
        newOps |= SelectionKey.OP_CONNECT;
        readyToConnect = true;
    }

    if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
        ((intOps & SelectionKey.OP_WRITE) != 0) &&
        (state == ST_CONNECTED))
        newOps |= SelectionKey.OP_WRITE;

    // 設定事件
    sk.nioReadyOps(newOps);
    
    // 如果新的就緒事件和老的就緒事件不相同,則返回true,否則返回 false
    return (newOps & ~oldOps) != 0;
}

上面就是步驟5的邏輯了,簡單總結一下。首先是獲取就緒通道數量,然後再獲取這些就緒通道對應的檔案描述符 fd,以及就緒事件集合 rOps。之後呼叫 translateAndSetReadyOps 轉換並設定就緒事件集合。最後,將選擇鍵新增到 selectedKeys 集合中,並累加 numKeysUpdated 值,之後返回該值。

以上就是選擇過程的程式碼講解,貼了不少程式碼,可能不太好理解。Java NIO 和作業系統介面關聯比較大,所以在學習 NIO 相關原理時,也應該去了解諸如 epoll 等系統呼叫的知識。沒有這些背景知識,很多東西看起來不太好懂。好了,本節到此結束。

 2.5 模板程式碼

使用 NIO 選擇器程式設計時,主幹程式碼的結構一般比較固定。所以把主幹程式碼寫好後,就可以往裡填業務程式碼了。下面貼一個服務端的模板程式碼,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("localhost", 8080));
ssc.configureBlocking(false);

Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);

while(true) {
    int readyNum = selector.select();
    if (readyNum == 0) {
        continue;
    }

    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    Iterator<SelectionKey> it = selectedKeys.iterator();
    
    while(it.hasNext()) {
        SelectionKey key = it.next();
        
        if(key.isAcceptable()) {
            // 接受連線
        } else if (key.isReadable()) {
            // 通道可讀
        } else if (key.isWritable()) {
            // 通道可寫
        }
        
        it.remove();
    }
}

 2.6 例項演示

原本打算將示例演示的程式碼放在本節中展示,奈何文章篇幅已經很大了,所以決定把本節的內容獨立成文。在下一篇文章中,我將會演示使用 Java NIO 完成一個簡單的 HTTP 伺服器。這裡先貼張效果圖,如下:

tinyhttpd_w

 3.總結

到這裡,本文差不多就要結束了。原本只是打算簡單說說 Selector 的用法,然後再寫一份例項程式碼。但是後來發現這樣寫顯得比較空洞,沒什麼深度。所以後來翻了一下 Selector 的原始碼,大致理解了 Selector 的邏輯,然後就有了上面的分析。不過 Selector 的邏輯並不止我上面所說的那些,還有一些內容我現在還沒看,所以就沒有講。對於已寫出來的分析,由於我個人水平有限,難免會有錯誤。如果有錯誤,也歡迎大家指出來,共同進步!

好了,本文到此結束,感謝大家的閱讀。

 參考

 附錄

文中貼的一些程式碼是沒有包含在 JDK src.zip 包裡的,這裡單獨列舉出來,方便大家查詢。

檔名 路徑
DefaultSelectorProvider.java jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java
EPollSelectorProvider.java jdk/src/solaris/classes/sun/nio/ch/EPollSelectorProvider.java
SelectorImpl.java jdk/src/share/classes/sun/nio/ch/SelectorImpl.java
EPollSelectorImpl.java jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java
EPollArrayWrapper.java jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java
SelectionKeyImpl.java jdk/src/share/classes/sun/nio/ch/SelectionKeyImpl.java
SocketChannelImpl.java jdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java
EPollArrayWrapper.c jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c

from: http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/