1. 程式人生 > 其它 >網路IO-多路複用

網路IO-多路複用

前言

1 多路複用就是os提供了系統呼叫,能一次性知道多個fd的狀態

2 大家都說多路複用的實現有3種 當然下述即使是輪詢也是發生在kernel中 效率肯定遠遠大於在核心外的輪詢

  • select 陣列容量限制fd數量的無差別輪詢O(N) 發生記憶體拷貝
  • poll 沒有fd數量限制的連結串列無差別輪詢O(N) 發生記憶體拷貝
  • epoll 近乎O(1)時間複雜度

根據已經有的理論知識和猜測跟著原始碼走一遍(首先承認我的無知,在此之前都是背的八股文,甚至壓根不知道macos系統kqueue的實現...)

1 Demo

package debug.io.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * <p>{@link Selector}就是複用器 在java nio下的一種對多路複用的具體實現<ul>
 *     <li>select</li>
 *     <li>poll</li>
 *     <li>epoll</li>
 *     <li>kqueue</li>
 * </ul></p>
 * @since 2022/5/21
 * @author dingrui
 */
public class SelectorTest {

    public static void main(String[] args) throws IOException {
        // 服務端建立 監聽埠
        ServerSocketChannel channel = ServerSocketChannel.open();
        channel.configureBlocking(false);
        channel.bind(new InetSocketAddress(9001));

        // 建立一個複用器
        Selector selector = Selector.open();
        channel.register(selector, SelectionKey.OP_ACCEPT); // 把channel註冊到複用器
        int ret = selector.select();// 阻塞呼叫
        Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 拿到事件就緒的channel

        // 以下就是根據業務展開了
        Iterator<SelectionKey> it = selectionKeys.iterator();
        while (it.hasNext()){
            SelectionKey sk = it.next();
            it.remove();
            sk.isValid();
            sk.isWritable();
            sk.isReadable();
            sk.isAcceptable();
        }
    }
}

老規矩,用實際的程式碼帶著問題驅動

跟進原始碼之前先猜測,真正關注的就下面4行,selector.selectedKeys()就已經可以獲取到有活動的channel了,說明多路複用器的多路複用功能體現在前3行

// 建立一個複用器
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT); // 把channel註冊到複用器
int ret = selector.select();// 阻塞呼叫
Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 拿到事件就緒的channel

先猜測一下

  • 建立複用器 -> 無非就是建立一個java bean,可能會涉及到屬性的初始化和方法塊的執行
  • 註冊 -> 見名知意,把channel繫結到複用器上,入參還有一個事件,那麼這個引數的作用要麼是在第3行,要麼是在第4行
  • select()方法 -> 這一行程式碼應該就是向os獲取fd狀態的核心
    • 如果實現方式是八股文說的select或者poll,那麼os需要一個fd的集合,客戶端沒有顯示傳進去,那麼就是在這個方法內部獲取了連向這個channel的所有socket
    • 如果實現方式是八股文上說的epoll,那麼實現細節也就肯定這個方法內部

2 原始碼

就跟著上面4行程式碼,從結果開始,從下往上的方式跟蹤

2.1 Set selectionKeys = selector.selectedKeys(); // 拿到事件就緒的channel

// SelectorImpl
  
// The set of keys with data ready for an operation
private final Set<SelectionKey> selectedKeys; // 事件就緒的kevent對應的selectionKey <-> publicSelectedKeys

// Public views of the key sets
private final Set<SelectionKey> publicKeys;             // Immutable
private final Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition // Selector#selectedKeys()返回的就是這個集合 <->selectedKeys


@Override
public final Set<SelectionKey> selectedKeys() {
  this.ensureOpen();
  return this.publicSelectedKeys; // 返回了一個集合作為結果 那麼只要關注這個集合的生成和新增過程就行
}

這個方法很簡單,就是直接將一個集合作為結果返回出去了,所以就將注意力放在這個集合的新增上就行

2.1.1 跟蹤SelectorImpl#publicSelectedKeys

protected SelectorImpl(SelectorProvider sp) { // 屬性初始化工作
  super(sp);
  this.keys = ConcurrentHashMap.newKeySet();
  this.selectedKeys = new HashSet<>();
  this.publicKeys = Collections.unmodifiableSet(keys);
  this.publicSelectedKeys = Util.ungrowableSet(selectedKeys); // 這個就是select()的結果 又自己實現了資料結構 不支援寫 相當於讀寫分離的設計 本質上引用指標指向的是selectedKeys
}

看看這個Util.ungrowableSet(Set)做了什麼

static <E> Set<E> ungrowableSet(final Set<E> s) {
        return new Set<E>() {

                public int size()                 { return s.size(); }
                public boolean isEmpty()          { return s.isEmpty(); }
                public boolean contains(Object o) { return s.contains(o); }
                public Object[] toArray()         { return s.toArray(); }
                public <T> T[] toArray(T[] a)     { return s.toArray(a); }
                public String toString()          { return s.toString(); }
                public Iterator<E> iterator()     { return s.iterator(); }
                public boolean equals(Object o)   { return s.equals(o); }
                public int hashCode()             { return s.hashCode(); }
                public void clear()               { s.clear(); }
                public boolean remove(Object o)   { return s.remove(o); }

                public boolean containsAll(Collection<?> coll) {
                    return s.containsAll(coll);
                }
                public boolean removeAll(Collection<?> coll) {
                    return s.removeAll(coll);
                }
                public boolean retainAll(Collection<?> coll) {
                    return s.retainAll(coll);
                }

                public boolean add(E o){
                    throw new UnsupportedOperationException();
                }
                public boolean addAll(Collection<? extends E> coll) {
                    throw new UnsupportedOperationException();
                }

        };
    }

就是自定義了一個Set的實現,不支援add方法,其他直接呼叫入參的集合方法

相當於一份資料兩處使用,類似讀寫分離,這個資料的指標指向的是SelectorImpl#selectedKeys

那麼比較清晰的是

  • 客戶端在一個while()迴圈裡面 通過迭代器拿到一個SelectionKey例項後就通過Set#removeSelectorImpl#selectedKeys裡面移除掉
  • 每次進入Selector#select()方法,都會往這個集合中新增資料,所以下面就只要關注這個集合的Set#add()操作就行

2.2 int ret = selector.select();// 阻塞呼叫

// Selector
public abstract int select() throws IOException; // macos系統 一次系統呼叫通過kqueue將就緒事件fd對應的SelectionKey都放到集合SelectorImpl#selectedKeys

// SelectorImpl
@Override
public final int select() throws IOException {
  return this.lockAndDoSelect(null, -1);
}

private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
  throws IOException
{ // 從Selector#select()過來 action=null timeout=-1
  synchronized (this) { // 管程鎖
    this.ensureOpen(); // 通過AbstractSelector#selectorOpen標識判斷
    if (inSelect) // 執行緒重入判斷
      throw new IllegalStateException("select in progress");
    inSelect = true; // 在同步程式碼塊中修改值 釋放鎖後修改回去 以此為執行緒重入的依據
    try {
      synchronized (this.publicSelectedKeys) {
        return this.doSelect(action, timeout);
      }
    } finally {
      inSelect = false;
    }
  }
  
  protected abstract int doSelect(Consumer<SelectionKey> action, long timeout)
    throws IOException; // 實現留給子類關注 macos的實現類在KQueueSelectorImpl

這個地方稍微熟悉設計模式就知道一定是模板方法,果斷看一下類圖

![image-20220521200252255](../img/:Users:dingrui:Library:Application Support:typora-user-images:image-20220521200252255.png)

SelectorImpl這個抽象類有兩處實現doSelect,那麼一般情況下會搭配策略模式去指定具體的實現,但是有想到八股文上說多路複用的實現跟作業系統有關係,那麼這個地方有可能是通過SPI或者類似的方式實現的擴充套件,但是這個方法呼叫是無參的,就有可能是在此基礎上封裝了一個面向客戶端的類,使用工廠模式去呼叫具體實現(先按下不表,以後填坑todo)

通過斷點debug,發現macos上的實現是KQueueSelectorImpl

2.2.1 KQueueSelectorImpl#doSelect

    @Override
    protected int doSelect(Consumer<SelectionKey> action, long timeout)
        throws IOException
    { // 從Selector#select()跟進來 action=null timeout=-1
        assert Thread.holdsLock(this); // 這個方法是實現了SelectorImpl#deoSelect()預留的模板鉤子 入口在SelectorImpl#lockAndDoSelect() 進入之前外面包了synchronized 這個地方再判斷一下當前執行執行緒是否持有了管程鎖

        long to = Math.min(timeout, Integer.MAX_VALUE);  // max kqueue timeout // to表示在有超時機制下還有多行時間可用 timeout=-1
        boolean blocking = (to != 0); // true
        boolean timedPoll = (to > 0); // false

        int numEntries;
        this.processUpdateQueue();
        this.processDeregisterQueue();
        try {
            this.begin(blocking);

            do {
                long startTime = timedPoll ? System.nanoTime() : 0; // 0
                numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to); // 本地方法 應該是類似於epoll的一次os sc 阻塞等待系統呼叫的返回
                if (numEntries == IOStatus.INTERRUPTED && timedPoll) { // 系統呼叫中斷(有可能是超時導致的) 如果客戶端設定了超時限制 判定果真超時就將返回值修改為對應的結果
                    // timed poll interrupted so need to adjust timeout
                    long adjust = System.nanoTime() - startTime;
                    to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
                    if (to <= 0) {
                        // timeout expired so no retry
                        numEntries = 0;
                    }
                }
            } while (numEntries == IOStatus.INTERRUPTED); // os sc返回被中斷狀態 異常情況下單獨判定是否是超時導致的
            assert IOStatus.check(numEntries);

        } finally {
            end(blocking);
        } // 截止目前 從核心拿到了numEntries
        this.processDeregisterQueue();
        return this.processEvents(numEntries, action); // action=null
    }

整個方法都沒有發現對SelectorImpl#selectedKeys的新增操作,跟蹤this.processEvents(numEntries, action)

    private int processEvents(int numEntries, Consumer<SelectionKey> action)
        throws IOException
    {
        assert Thread.holdsLock(this);

        int numKeysUpdated = 0;
        boolean interrupted = false;

        // A file descriptor may be registered with kqueue with more than one
        // filter and so there may be more than one event for a fd. The poll
        // count is incremented here and compared against the SelectionKey's
        // "lastPolled" field. This ensures that the ready ops is updated rather
        // than replaced when a file descriptor is polled by both the read and
        // write filter.
        this.pollCount++;

        for (int i = 0; i < numEntries; i++) {
            long kevent = KQueue.getEvent(this.pollArrayAddress, i); // 從連續的記憶體空間上找到對應的kevent
            int fd = KQueue.getDescriptor(kevent); // kevent對應的fd
            if (fd == fd0) {
                interrupted = true;
            } else {
                SelectionKeyImpl ski = fdToKey.get(fd); // 應該是將fd封裝成java物件 使用者層對物件的讀寫操作最終對映到os就是那種fd進行操作
                if (ski != null) {
                    int rOps = 0;
                    short filter = KQueue.getFilter(kevent);
                    if (filter == EVFILT_READ) {
                        rOps |= Net.POLLIN;
                    } else if (filter == EVFILT_WRITE) {
                        rOps |= Net.POLLOUT;
                    }
                    int updated = super.processReadyEvents(rOps, ski, action); // rOps=應該是kqueue關注的kevent事件 action=null 過濾出os核心給出的kevent發生過事件變更的
                    if (updated > 0 && ski.lastPolled != pollCount) {
                        numKeysUpdated++;
                        ski.lastPolled = pollCount;
                    }
                }
            }
        }

        if (interrupted) {
            clearInterrupt();
        }
        return numKeysUpdated;
    }

重點mark一下

  • fdToKey是一個map,存放著fd和SelectionKey的對映快取
  • rOps是根據OS kqueue的資料結構kevent計算出來的,而每個kevent就是對應著fd,也就是socket

繼續跟進super.processReadyEvents()方法

    protected final int processReadyEvents(int rOps,
                                           SelectionKeyImpl ski,
                                           Consumer<SelectionKey> action) { // 返回值要麼是0 要麼是1 rOps是os從kqueue的kevent中拿來的
        if (action != null) {
            ski.translateAndSetReadyOps(rOps);
            if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                action.accept(ski);
                ensureOpen();
                return 1;
            }
        } else { // action=null
            assert Thread.holdsLock(publicSelectedKeys);
            if (this.selectedKeys.contains(ski)) { // 事件就緒的kevent對應的fd(對映成SelectionKey)已經被快取過了
                if (ski.translateAndUpdateReadyOps(rOps)) {
                    return 1;
                }
            } else {
                ski.translateAndSetReadyOps(rOps); // 可能會去修改ski#readyOps
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { // 與操作 readOps是根據rOps來的 interestOps是channel.register()的時候傳進來的肯定不是0 也就是說決定socket時候返回給客戶端的因素在於這個rOps
                    this.selectedKeys.add(ski); // 處於就緒事件的selectionKey
                    return 1;
                }
            }
        }
        return 0;
    }

終於看到關係的那個集合了this.selectedKeys.add(ski),action是一個函式式介面,姑且當作null來處理,不影響

那麼這個集合的新增只有一個條件if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0)這是一個與操作,首先後半部分肯定不為0,傳進來的值是channel.register(selector, SelectionKey.OP_ACCEPT)等於16,因此只要前半部分值不是0就是一個候選人

因此,問題來了,這個skio.niReadyOps()的返回值是什麼,誰會去改變它,跟核心返回socket狀態有什麼關係

// SelectionKeyImpl

private volatile int readyOps;

這個值僅僅是SelectionKeyIml的一個屬性

好運是這個屬性的修改只有一個方法

// SelectionKeyImpl

public void nioReadyOps(int ops) {
        this.readyOps = ops;
    }

跟蹤下去發現這個值就是ski.translateAndSetReadyOps(rOps)這行程式碼可能會去做的修改

// ServerSocketChannelImpl

public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
        int intOps = ski.nioInterestOps();
        int oldOps = ski.nioReadyOps();
        int newOps = initialOps;

        if ((ops & Net.POLLNVAL) != 0) {
            // This should only happen if this channel is pre-closed while a
            // selection operation is in progress
            // ## Throw an error if this channel has not been pre-closed
            return false;
        }

        if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
            newOps = intOps;
            ski.nioReadyOps(newOps);
            return (newOps & ~oldOps) != 0;
        }

        if (((ops & Net.POLLIN) != 0) &&
            ((intOps & SelectionKey.OP_ACCEPT) != 0))
                newOps |= SelectionKey.OP_ACCEPT;

        ski.nioReadyOps(newOps); // 修改readyOps為newOps
        return (newOps & ~oldOps) != 0;
    }

那現在的結論就是: rOps這個引數決定著哪些socket是有狀態標記返回給客戶端的

之前mark了2個東西,現在派上了用場

  • fdToKey是一個map,存放著fd和SelectionKey的對映快取
  • rOps是根據OS kqueue的資料結構kevent計算出來的,而每個kevent就是對應著fd,也就是socket

梳理一下,到目前為止,可以得到的結論是

藉助於kqueue的api,一次KQueue.poll()對應的系統呼叫獲取到一個long型數字,根據開闢好的一個數組首地址,依次輪詢這個long,其實就是獲取到kqueue給我們的有狀態的kevent,我們再根據kevent獲取到fd

只要能根據fd獲取到所謂的SelectionKeyImpl就一切都解了,問題又回到了SelectionKeyImpl ski = this.fdToKey.get(fd)這行程式碼,看呼叫棧可以發現這個對映關係是channel.register()的時候新增的

2.3 channel.register(selector, SelectionKey.OP_ACCEPT); // 把channel註冊到複用器

2.3.1 AbstractSelectableChannel

    public final SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException
    { // ServerSocketChannel.register(selector, SelectionKey.OP_ACCEPT)跟蹤進來的 att=null
        if ((ops & ~validOps()) != 0) // 引數校驗 確保複用器要關注的事件是有效的
            throw new IllegalArgumentException();
        if (!isOpen()) // 引數校驗 確保channel是處於監聽狀態的
            throw new ClosedChannelException();
        synchronized (regLock) {
            if (isBlocking())
                throw new IllegalBlockingModeException();
            synchronized (keyLock) {
                // re-check if channel has been closed
                if (!isOpen())
                    throw new ClosedChannelException();
                SelectionKey k = this.findKey(sel); // k代表的是註冊在sel這個複用器上的channel
                if (k != null) { // channel已經註冊過了複用器 這個方法再次被呼叫可能意味著修改某些引數
                    k.attach(att);
                    k.interestOps(ops);
                } else { // channel向複用器上註冊
                    // New registration
                    k = ((AbstractSelector)sel).register(this, ops, att); // channel註冊複用器成功後返回的SelectionKey相當於是對channel的封裝
                    this.addKey(k); // 把複用器註冊成功的channel快取到keys集合中
                }
                return k;
            }
        }
    }

首次進來,肯定是還沒註冊,自然而然k = ((AbstractSelector)sel).register(this, ops, att)

2.3.2 SelectionKeyImpl

    @Override
    public SelectionKey interestOps(int ops) {
        ensureValid();
        if ((ops & ~channel().validOps()) != 0)
            throw new IllegalArgumentException();
        int oldOps = (int) INTERESTOPS.getAndSet(this, ops);
        if (ops != oldOps) {
            this.selector.setEventOps(this); // 把複用器需要關注的事件快取起來 放到KQueueSelectorImpl#updateKeys中
        }
        return this;
    }

2.3.3 KQueueSelectorImpl

    @Override
    public void setEventOps(SelectionKeyImpl ski) {
        ensureOpen();
        synchronized (updateLock) {
            this.updateKeys.addLast(ski); // channel.register(selector, SelectionKey.OP_ACCEPT)觸發的 重要 後面找fd跟ski對映之前要用到
        }
    }

到這已經出現了一個比較重要的操作了this.updateKeys.addLast(ski),在找fd跟ski的對映之前會有一個動作從updateKeys彈出

這個動作發生在KQueueSelectorImpl#doSelect()中的this.processUpdateQueue()

2.4 Selector selector = Selector.open()

    KQueueSelectorImpl(SelectorProvider sp) throws IOException { // sp是KQueueSelectorProvider的例項
        super(sp);

        this.kqfd = KQueue.create(); // 應該是對應的一次系統呼叫 The kqueue() system call allocates a kqueue file descriptor.  This file descriptor provides a generic method of notifying the user when a kernel event (kevent) happens or a condition holds, based on the results of small pieces of ker-nel code termed filters
        this.pollArrayAddress = KQueue.allocatePollArray(MAX_KEVENTS); // os實現kqueue定義了kevent資料結構 申請指定數量的kevent記憶體

        try {
            long fds = IOUtil.makePipe(false);
            this.fd0 = (int) (fds >>> 32); // 高32位 讀
            this.fd1 = (int) fds; // 低32位 寫
        } catch (IOException ioe) {
            KQueue.freePollArray(pollArrayAddress);
            FileDispatcherImpl.closeIntFD(kqfd);
            throw ioe;
        }

        // register one end of the socket pair for wakeups
        KQueue.register(kqfd, fd0, EVFILT_READ, EV_ADD); // os sc 這個地方猜測應該是呼叫的這個int kevent(int kq, const struct kevent *changelist, int nchanges, struct kevent *eventlist, int nevents, const struct timespec *timeout);
    }

這邊就是最簡單的了,做了一些屬性賦值的準備工作

核心就是在構造方法中體現出了kqueue的呼叫,應該是發生了兩次系統呼叫

  • kqueue()
  • kevent()

以下是從手冊中查到的資料

3 總結

關於多路複用器的一個具體實現java中的Selector就結束了

整個的實現步驟就將文件逆著看

這個複用器的實現依賴的是作業系統macos的kqueue(kqueue我也是第一次見到 具體os實現以後學到了再回來填坑todo)