網路IO-多路複用
前言
1 多路複用就是os提供了系統呼叫,能一次性知道多個fd的狀態
2 大家都說多路複用的實現有3種 當然下述即使是輪詢也是發生在kernel中 效率肯定遠遠大於在核心外的輪詢
- select 陣列容量限制fd數量的無差別輪詢O(N) 發生記憶體拷貝
- poll 沒有fd數量限制的連結串列無差別輪詢O(N) 發生記憶體拷貝
- epoll 近乎O(1)時間複雜度
根據已經有的理論知識和猜測跟著原始碼走一遍(首先承認我的無知,在此之前都是背的八股文,甚至壓根不知道macos系統kqueue的實現...)
1 Demo
package debug.io.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.util.Iterator; import java.util.Set; /** * <p>{@link Selector}就是複用器 在java nio下的一種對多路複用的具體實現<ul> * <li>select</li> * <li>poll</li> * <li>epoll</li> * <li>kqueue</li> * </ul></p> * @since 2022/5/21 * @author dingrui */ public class SelectorTest { public static void main(String[] args) throws IOException { // 服務端建立 監聽埠 ServerSocketChannel channel = ServerSocketChannel.open(); channel.configureBlocking(false); channel.bind(new InetSocketAddress(9001)); // 建立一個複用器 Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_ACCEPT); // 把channel註冊到複用器 int ret = selector.select();// 阻塞呼叫 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 拿到事件就緒的channel // 以下就是根據業務展開了 Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()){ SelectionKey sk = it.next(); it.remove(); sk.isValid(); sk.isWritable(); sk.isReadable(); sk.isAcceptable(); } } }
老規矩,用實際的程式碼帶著問題驅動
跟進原始碼之前先猜測,真正關注的就下面4行,selector.selectedKeys()
就已經可以獲取到有活動的channel了,說明多路複用器的多路複用
功能體現在前3行
// 建立一個複用器 Selector selector = Selector.open(); channel.register(selector, SelectionKey.OP_ACCEPT); // 把channel註冊到複用器 int ret = selector.select();// 阻塞呼叫 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 拿到事件就緒的channel
先猜測一下
- 建立複用器 -> 無非就是建立一個java bean,可能會涉及到屬性的初始化和方法塊的執行
- 註冊 -> 見名知意,把channel繫結到複用器上,入參還有一個事件,那麼這個引數的作用要麼是在第3行,要麼是在第4行
- select()方法 -> 這一行程式碼應該就是向os獲取fd狀態的核心
- 如果實現方式是八股文說的select或者poll,那麼os需要一個fd的集合,客戶端沒有顯示傳進去,那麼就是在這個方法內部獲取了連向這個channel的所有socket
- 如果實現方式是八股文上說的epoll,那麼實現細節也就肯定這個方法內部
2 原始碼
就跟著上面4行程式碼,從結果開始,從下往上的方式跟蹤
2.1 Set selectionKeys = selector.selectedKeys(); // 拿到事件就緒的channel
// SelectorImpl
// The set of keys with data ready for an operation
private final Set<SelectionKey> selectedKeys; // 事件就緒的kevent對應的selectionKey <-> publicSelectedKeys
// Public views of the key sets
private final Set<SelectionKey> publicKeys; // Immutable
private final Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition // Selector#selectedKeys()返回的就是這個集合 <->selectedKeys
@Override
public final Set<SelectionKey> selectedKeys() {
this.ensureOpen();
return this.publicSelectedKeys; // 返回了一個集合作為結果 那麼只要關注這個集合的生成和新增過程就行
}
這個方法很簡單,就是直接將一個集合作為結果返回出去了,所以就將注意力放在這個集合的新增上就行
2.1.1 跟蹤SelectorImpl#publicSelectedKeys
protected SelectorImpl(SelectorProvider sp) { // 屬性初始化工作
super(sp);
this.keys = ConcurrentHashMap.newKeySet();
this.selectedKeys = new HashSet<>();
this.publicKeys = Collections.unmodifiableSet(keys);
this.publicSelectedKeys = Util.ungrowableSet(selectedKeys); // 這個就是select()的結果 又自己實現了資料結構 不支援寫 相當於讀寫分離的設計 本質上引用指標指向的是selectedKeys
}
看看這個Util.ungrowableSet(Set)
做了什麼
static <E> Set<E> ungrowableSet(final Set<E> s) {
return new Set<E>() {
public int size() { return s.size(); }
public boolean isEmpty() { return s.isEmpty(); }
public boolean contains(Object o) { return s.contains(o); }
public Object[] toArray() { return s.toArray(); }
public <T> T[] toArray(T[] a) { return s.toArray(a); }
public String toString() { return s.toString(); }
public Iterator<E> iterator() { return s.iterator(); }
public boolean equals(Object o) { return s.equals(o); }
public int hashCode() { return s.hashCode(); }
public void clear() { s.clear(); }
public boolean remove(Object o) { return s.remove(o); }
public boolean containsAll(Collection<?> coll) {
return s.containsAll(coll);
}
public boolean removeAll(Collection<?> coll) {
return s.removeAll(coll);
}
public boolean retainAll(Collection<?> coll) {
return s.retainAll(coll);
}
public boolean add(E o){
throw new UnsupportedOperationException();
}
public boolean addAll(Collection<? extends E> coll) {
throw new UnsupportedOperationException();
}
};
}
就是自定義了一個Set
的實現,不支援add
方法,其他直接呼叫入參的集合方法
相當於一份資料兩處使用,類似讀寫分離,這個資料的指標指向的是SelectorImpl#selectedKeys
那麼比較清晰的是
- 客戶端在一個
while()
迴圈裡面 通過迭代器拿到一個SelectionKey
例項後就通過Set#remove
從SelectorImpl#selectedKeys
裡面移除掉 - 每次進入
Selector#select()
方法,都會往這個集合中新增資料,所以下面就只要關注這個集合的Set#add()
操作就行
2.2 int ret = selector.select();// 阻塞呼叫
// Selector
public abstract int select() throws IOException; // macos系統 一次系統呼叫通過kqueue將就緒事件fd對應的SelectionKey都放到集合SelectorImpl#selectedKeys
// SelectorImpl
@Override
public final int select() throws IOException {
return this.lockAndDoSelect(null, -1);
}
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{ // 從Selector#select()過來 action=null timeout=-1
synchronized (this) { // 管程鎖
this.ensureOpen(); // 通過AbstractSelector#selectorOpen標識判斷
if (inSelect) // 執行緒重入判斷
throw new IllegalStateException("select in progress");
inSelect = true; // 在同步程式碼塊中修改值 釋放鎖後修改回去 以此為執行緒重入的依據
try {
synchronized (this.publicSelectedKeys) {
return this.doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
protected abstract int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException; // 實現留給子類關注 macos的實現類在KQueueSelectorImpl
這個地方稍微熟悉設計模式就知道一定是模板方法,果斷看一下類圖
![image-20220521200252255](../img/:Users:dingrui:Library:Application Support:typora-user-images:image-20220521200252255.png)
SelectorImpl
這個抽象類有兩處實現doSelect
,那麼一般情況下會搭配策略模式去指定具體的實現,但是有想到八股文上說多路複用的實現跟作業系統有關係,那麼這個地方有可能是通過SPI或者類似的方式實現的擴充套件,但是這個方法呼叫是無參的,就有可能是在此基礎上封裝了一個面向客戶端的類,使用工廠模式去呼叫具體實現(先按下不表,以後填坑todo)
通過斷點debug,發現macos上的實現是KQueueSelectorImpl
2.2.1 KQueueSelectorImpl#doSelect
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{ // 從Selector#select()跟進來 action=null timeout=-1
assert Thread.holdsLock(this); // 這個方法是實現了SelectorImpl#deoSelect()預留的模板鉤子 入口在SelectorImpl#lockAndDoSelect() 進入之前外面包了synchronized 這個地方再判斷一下當前執行執行緒是否持有了管程鎖
long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout // to表示在有超時機制下還有多行時間可用 timeout=-1
boolean blocking = (to != 0); // true
boolean timedPoll = (to > 0); // false
int numEntries;
this.processUpdateQueue();
this.processDeregisterQueue();
try {
this.begin(blocking);
do {
long startTime = timedPoll ? System.nanoTime() : 0; // 0
numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to); // 本地方法 應該是類似於epoll的一次os sc 阻塞等待系統呼叫的返回
if (numEntries == IOStatus.INTERRUPTED && timedPoll) { // 系統呼叫中斷(有可能是超時導致的) 如果客戶端設定了超時限制 判定果真超時就將返回值修改為對應的結果
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
if (to <= 0) {
// timeout expired so no retry
numEntries = 0;
}
}
} while (numEntries == IOStatus.INTERRUPTED); // os sc返回被中斷狀態 異常情況下單獨判定是否是超時導致的
assert IOStatus.check(numEntries);
} finally {
end(blocking);
} // 截止目前 從核心拿到了numEntries
this.processDeregisterQueue();
return this.processEvents(numEntries, action); // action=null
}
整個方法都沒有發現對SelectorImpl#selectedKeys
的新增操作,跟蹤this.processEvents(numEntries, action)
private int processEvents(int numEntries, Consumer<SelectionKey> action)
throws IOException
{
assert Thread.holdsLock(this);
int numKeysUpdated = 0;
boolean interrupted = false;
// A file descriptor may be registered with kqueue with more than one
// filter and so there may be more than one event for a fd. The poll
// count is incremented here and compared against the SelectionKey's
// "lastPolled" field. This ensures that the ready ops is updated rather
// than replaced when a file descriptor is polled by both the read and
// write filter.
this.pollCount++;
for (int i = 0; i < numEntries; i++) {
long kevent = KQueue.getEvent(this.pollArrayAddress, i); // 從連續的記憶體空間上找到對應的kevent
int fd = KQueue.getDescriptor(kevent); // kevent對應的fd
if (fd == fd0) {
interrupted = true;
} else {
SelectionKeyImpl ski = fdToKey.get(fd); // 應該是將fd封裝成java物件 使用者層對物件的讀寫操作最終對映到os就是那種fd進行操作
if (ski != null) {
int rOps = 0;
short filter = KQueue.getFilter(kevent);
if (filter == EVFILT_READ) {
rOps |= Net.POLLIN;
} else if (filter == EVFILT_WRITE) {
rOps |= Net.POLLOUT;
}
int updated = super.processReadyEvents(rOps, ski, action); // rOps=應該是kqueue關注的kevent事件 action=null 過濾出os核心給出的kevent發生過事件變更的
if (updated > 0 && ski.lastPolled != pollCount) {
numKeysUpdated++;
ski.lastPolled = pollCount;
}
}
}
}
if (interrupted) {
clearInterrupt();
}
return numKeysUpdated;
}
重點mark一下
- fdToKey是一個map,存放著fd和SelectionKey的對映快取
- rOps是根據OS kqueue的資料結構kevent計算出來的,而每個kevent就是對應著fd,也就是socket
繼續跟進super.processReadyEvents
()方法
protected final int processReadyEvents(int rOps,
SelectionKeyImpl ski,
Consumer<SelectionKey> action) { // 返回值要麼是0 要麼是1 rOps是os從kqueue的kevent中拿來的
if (action != null) {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
action.accept(ski);
ensureOpen();
return 1;
}
} else { // action=null
assert Thread.holdsLock(publicSelectedKeys);
if (this.selectedKeys.contains(ski)) { // 事件就緒的kevent對應的fd(對映成SelectionKey)已經被快取過了
if (ski.translateAndUpdateReadyOps(rOps)) {
return 1;
}
} else {
ski.translateAndSetReadyOps(rOps); // 可能會去修改ski#readyOps
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { // 與操作 readOps是根據rOps來的 interestOps是channel.register()的時候傳進來的肯定不是0 也就是說決定socket時候返回給客戶端的因素在於這個rOps
this.selectedKeys.add(ski); // 處於就緒事件的selectionKey
return 1;
}
}
}
return 0;
}
終於看到關係的那個集合了this.selectedKeys.add(ski)
,action是一個函式式介面,姑且當作null來處理,不影響
那麼這個集合的新增只有一個條件if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0)
這是一個與操作,首先後半部分肯定不為0,傳進來的值是channel.register(selector, SelectionKey.OP_ACCEPT)
等於16,因此只要前半部分值不是0就是一個候選人
因此,問題來了,這個skio.niReadyOps()
的返回值是什麼,誰會去改變它,跟核心返回socket狀態有什麼關係
// SelectionKeyImpl
private volatile int readyOps;
這個值僅僅是SelectionKeyIml
的一個屬性
好運是這個屬性的修改只有一個方法
// SelectionKeyImpl
public void nioReadyOps(int ops) {
this.readyOps = ops;
}
跟蹤下去發現這個值就是ski.translateAndSetReadyOps(rOps)
這行程式碼可能會去做的修改
// ServerSocketChannelImpl
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
int intOps = ski.nioInterestOps();
int oldOps = ski.nioReadyOps();
int newOps = initialOps;
if ((ops & Net.POLLNVAL) != 0) {
// This should only happen if this channel is pre-closed while a
// selection operation is in progress
// ## Throw an error if this channel has not been pre-closed
return false;
}
if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
newOps = intOps;
ski.nioReadyOps(newOps);
return (newOps & ~oldOps) != 0;
}
if (((ops & Net.POLLIN) != 0) &&
((intOps & SelectionKey.OP_ACCEPT) != 0))
newOps |= SelectionKey.OP_ACCEPT;
ski.nioReadyOps(newOps); // 修改readyOps為newOps
return (newOps & ~oldOps) != 0;
}
那現在的結論就是: rOps
這個引數決定著哪些socket是有狀態標記返回給客戶端的
之前mark了2個東西,現在派上了用場
- fdToKey是一個map,存放著fd和SelectionKey的對映快取
- rOps是根據OS kqueue的資料結構kevent計算出來的,而每個kevent就是對應著fd,也就是socket
梳理一下,到目前為止,可以得到的結論是
藉助於kqueue的api,一次KQueue.poll()
對應的系統呼叫獲取到一個long型數字,根據開闢好的一個數組首地址,依次輪詢這個long,其實就是獲取到kqueue給我們的有狀態的kevent,我們再根據kevent獲取到fd
只要能根據fd獲取到所謂的SelectionKeyImpl就一切都解了,問題又回到了SelectionKeyImpl ski = this.fdToKey.get(fd)
這行程式碼,看呼叫棧可以發現這個對映關係是channel.register()
的時候新增的
2.3 channel.register(selector, SelectionKey.OP_ACCEPT); // 把channel註冊到複用器
2.3.1 AbstractSelectableChannel
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{ // ServerSocketChannel.register(selector, SelectionKey.OP_ACCEPT)跟蹤進來的 att=null
if ((ops & ~validOps()) != 0) // 引數校驗 確保複用器要關注的事件是有效的
throw new IllegalArgumentException();
if (!isOpen()) // 引數校驗 確保channel是處於監聽狀態的
throw new ClosedChannelException();
synchronized (regLock) {
if (isBlocking())
throw new IllegalBlockingModeException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
SelectionKey k = this.findKey(sel); // k代表的是註冊在sel這個複用器上的channel
if (k != null) { // channel已經註冊過了複用器 這個方法再次被呼叫可能意味著修改某些引數
k.attach(att);
k.interestOps(ops);
} else { // channel向複用器上註冊
// New registration
k = ((AbstractSelector)sel).register(this, ops, att); // channel註冊複用器成功後返回的SelectionKey相當於是對channel的封裝
this.addKey(k); // 把複用器註冊成功的channel快取到keys集合中
}
return k;
}
}
}
首次進來,肯定是還沒註冊,自然而然k = ((AbstractSelector)sel).register(this, ops, att)
2.3.2 SelectionKeyImpl
@Override
public SelectionKey interestOps(int ops) {
ensureValid();
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
int oldOps = (int) INTERESTOPS.getAndSet(this, ops);
if (ops != oldOps) {
this.selector.setEventOps(this); // 把複用器需要關注的事件快取起來 放到KQueueSelectorImpl#updateKeys中
}
return this;
}
2.3.3 KQueueSelectorImpl
@Override
public void setEventOps(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
this.updateKeys.addLast(ski); // channel.register(selector, SelectionKey.OP_ACCEPT)觸發的 重要 後面找fd跟ski對映之前要用到
}
}
到這已經出現了一個比較重要的操作了this.updateKeys.addLast(ski)
,在找fd跟ski的對映之前會有一個動作從updateKeys彈出
這個動作發生在KQueueSelectorImpl#doSelect()
中的this.processUpdateQueue()
2.4 Selector selector = Selector.open()
KQueueSelectorImpl(SelectorProvider sp) throws IOException { // sp是KQueueSelectorProvider的例項
super(sp);
this.kqfd = KQueue.create(); // 應該是對應的一次系統呼叫 The kqueue() system call allocates a kqueue file descriptor. This file descriptor provides a generic method of notifying the user when a kernel event (kevent) happens or a condition holds, based on the results of small pieces of ker-nel code termed filters
this.pollArrayAddress = KQueue.allocatePollArray(MAX_KEVENTS); // os實現kqueue定義了kevent資料結構 申請指定數量的kevent記憶體
try {
long fds = IOUtil.makePipe(false);
this.fd0 = (int) (fds >>> 32); // 高32位 讀
this.fd1 = (int) fds; // 低32位 寫
} catch (IOException ioe) {
KQueue.freePollArray(pollArrayAddress);
FileDispatcherImpl.closeIntFD(kqfd);
throw ioe;
}
// register one end of the socket pair for wakeups
KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD); // os sc 這個地方猜測應該是呼叫的這個int kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout);
}
這邊就是最簡單的了,做了一些屬性賦值的準備工作
核心就是在構造方法中體現出了kqueue的呼叫,應該是發生了兩次系統呼叫
- kqueue()
- kevent()
以下是從手冊中查到的資料
3 總結
關於多路複用器的一個具體實現java中的Selector就結束了
整個的實現步驟就將文件逆著看
這個複用器的實現依賴的是作業系統macos的kqueue(kqueue我也是第一次見到 具體os實現以後學到了再回來填坑todo)