1. 程式人生 > >深入淺出NIO之Selector實現原理

深入淺出NIO之Selector實現原理

開發十年,就只剩下這套架構體系了! >>>   

前言

Java NIO 由以下幾個核心部分組成:
1、Buffer
2、Channel
3、Selector

Buffer和Channel在深入淺出NIO之Channel、Buffer一文中已經介紹過,本文主要講解NIO的Selector實現原理。

之前進行socket程式設計時,accept方法會一直阻塞,直到有客戶端請求的到來,並返回socket進行相應的處理。整個過程是流水線的,處理完一個請求,才能去獲取並處理後面的請求,當然也可以把獲取socket和處理socket的過程分開,一個執行緒負責accept,一個執行緒池負責處理請求。

但NIO提供了更好的解決方案,採用選擇器(Selector)返回已經準備好的socket,並按順序處理,基於通道(Channel)和緩衝區(Buffer)來進行資料的傳輸。

Selector

這裡出來一個新概念,selector,具體是一個什麼樣的東西?

想想一個場景:在一個養雞場,有這麼一個人,每天的工作就是不停檢查幾個特殊的雞籠,如果有雞進來,有雞出去,有雞生蛋,有雞生病等等,就把相應的情況記錄下來,如果雞場的負責人想知道情況,只需要詢問那個人即可。

在這裡,這個人就相當Selector,每個雞籠相當於一個SocketChannel,每個執行緒通過一個Selector可以管理多個SocketChannel。

為了實現Selector管理多個SocketChannel,必須將具體的SocketChannel物件註冊到Selector,並宣告需要監聽的事件(這樣Selector才知道需要記錄什麼資料),一共有4種事件:

1、connect:客戶端連線服務端事件,對應值為SelectionKey.OP_CONNECT(8)
2、accept:服務端接收客戶端連線事件,對應值為SelectionKey.OP_ACCEPT(16)
3、read:讀事件,對應值為SelectionKey.OP_READ(1)
4、write:寫事件,對應值為SelectionKey.OP_WRITE(4)

這個很好理解,每次請求到達伺服器,都是從connect開始,connect成功後,服務端開始準備accept,準備就緒,開始讀資料,並處理,最後寫回資料返回。

所以,當SocketChannel有對應的事件發生時,Selector都可以觀察到,並進行相應的處理。

服務端程式碼

為了更好的理解,先看一段服務端的示例程式碼

ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
    int n = selector.select();
    if (n == 0) continue;
    Iterator ite = this.selector.selectedKeys().iterator();
    while(ite.hasNext()){
        SelectionKey key = (SelectionKey)ite.next();
        if (key.isAcceptable()){
            SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
            clntChan.configureBlocking(false);
            //將選擇器註冊到連線到的客戶端通道,
            //並指定該通道key值的屬性為OP_READ,
            //同時為該通道指定關聯的附件
            clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
        }
        if (key.isReadable()){
            handleRead(key);
        }
        if (key.isWritable() && key.isValid()){
            handleWrite(key);
        }
        if (key.isConnectable()){
            System.out.println("isConnectable = true");
        }
      ite.remove();
    }
}

服務端操作過程

1、建立ServerSocketChannel例項,並繫結指定埠;
2、建立Selector例項;
3、將serverSocketChannel註冊到selector,並指定事件OP_ACCEPT,最底層的socket通過channel和selector建立關聯;
4、如果沒有準備好的socket,select方法會被阻塞一段時間並返回0;
5、如果底層有socket已經準備好,selector的select方法會返回socket的個數,而且selectedKeys方法會返回socket對應的事件(connect、accept、read or write);
6、根據事件型別,進行不同的處理邏輯;

在步驟3中,selector只註冊了serverSocketChannel的OP_ACCEPT事件
1、如果有客戶端A連線服務,執行select方法時,可以通過serverSocketChannel獲取客戶端A的socketChannel,並在selector上註冊socketChannel的OP_READ事件。
2、如果客戶端A傳送資料,會觸發read事件,這樣下次輪詢呼叫select方法時,就能通過socketChannel讀取資料,同時在selector上註冊該socketChannel的OP_WRITE事件,實現伺服器往客戶端寫資料。

Selector實現原理

SocketChannel、ServerSocketChannel和Selector的例項初始化都通過SelectorProvider類實現,其中Selector是整個NIO Socket的核心實現。

public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

SelectorProvider在windows和linux下有不同的實現,provider方法會返回對應的實現。

這裡不禁要問,Selector是如何做到同時管理多個socket?

下面我們看看Selector的具體實現,Selector初始化時,會例項化PollWrapper、SelectionKeyImpl陣列和Pipe。

WindowsSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    pollWrapper = new PollArrayWrapper(INIT_CAP);
    wakeupPipe = Pipe.open();
    wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

    // Disable the Nagle algorithm so that the wakeup is more immediate
    SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
    (sink.sc).socket().setTcpNoDelay(true);
    wakeupSinkFd = ((SelChImpl)sink).getFDVal();
    pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}

pollWrapper用Unsafe類申請一塊實體記憶體pollfd,存放socket控制代碼fdVal和events,其中pollfd共8位,0-3位儲存socket控制代碼,4-7位儲存events。

 

pollWrapper提供了fdVal和event資料的相應操作,如新增操作通過Unsafe的putInt和putShort實現。

void putDescriptor(int i, int fd) {
    pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
    pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}

先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)是如何實現的

public final SelectionKey register(Selector sel, int ops, Object att)
    throws ClosedChannelException {
    synchronized (regLock) {
        SelectionKey k = findKey(sel);
        if (k != null) {
            k.interestOps(ops);
            k.attach(att);
        }
        if (k == null) {
            // New registration
            synchronized (keyLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                k = ((AbstractSelector)sel).register(this, ops, att);
                addKey(k);
            }
        }
        return k;
    }
}
  1. 如果該channel和selector已經註冊過,則直接新增事件和附件。
  2. 否則通過selector實現註冊過程。
protected final SelectionKey register(AbstractSelectableChannel ch,
      int ops,  Object attachment) {
    if (!(ch instanceof SelChImpl))
        throw new IllegalSelectorException();
    SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
    k.attach(attachment);
    synchronized (publicKeys) {
        implRegister(k);
    }
    k.interestOps(ops);
    return k;
}

protected void implRegister(SelectionKeyImpl ski) {
    synchronized (closeLock) {
        if (pollWrapper == null)
            throw new ClosedSelectorException();
        growIfNeeded();
        channelArray[totalChannels] = ski;
        ski.setIndex(totalChannels);
        fdMap.put(ski);
        keys.add(ski);
        pollWrapper.addEntry(totalChannels, ski);
        totalChannels++;
    }
}

1、以當前channel和selector為引數,初始化SelectionKeyImpl 物件selectionKeyImpl ,並新增附件attachment。
2、如果當前channel的數量totalChannels等於SelectionKeyImpl陣列大小,對SelectionKeyImpl陣列和pollWrapper進行擴容操作。
3、如果totalChannels % MAX_SELECTABLE_FDS == 0,則多開一個執行緒處理selector。
4、pollWrapper.addEntry將把selectionKeyImpl中的socket控制代碼新增到對應的pollfd。
5、k.interestOps(ops)方法最終也會把event新增到對應的pollfd。

所以,不管serverSocketChannel,還是socketChannel,在selector註冊的事件,最終都儲存在pollArray中。

接著,再來看看selector中的select是如何實現一次獲取多個有事件發生的channel的,底層由selector實現類的doSelect方法實現,如下:

 protected int doSelect(long timeout) throws IOException {
        if (channelArray == null)
            throw new ClosedSelectorException();
        this.timeout = timeout; // set selector timeout
        processDeregisterQueue();
        if (interruptTriggered) {
            resetWakeupSocket();
            return 0;
        }
        // Calculate number of helper threads needed for poll. If necessary
        // threads are created here and start waiting on startLock
        adjustThreadsCount();
        finishLock.reset(); // reset finishLock
        // Wakeup helper threads, waiting on startLock, so they start polling.
        // Redundant threads will exit here after wakeup.
        startLock.startThreads();
        // do polling in the main thread. Main thread is responsible for
        // first MAX_SELECTABLE_FDS entries in pollArray.
        try {
            begin();
            try {
                subSelector.poll();
            } catch (IOException e) {
                finishLock.setException(e); // Save this exception
            }
            // Main thread is out of poll(). Wakeup others and wait for them
            if (threads.size() > 0)
                finishLock.waitForHelperThreads();
          } finally {
              end();
          }
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        finishLock.checkForException();
        processDeregisterQueue();
        int updated = updateSelectedKeys();
        // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
        resetWakeupSocket();
        return updated;
    }

其中 subSelector.poll() 是select的核心,由native函式poll0實現,readFds、writeFds 和exceptFds陣列用來儲存底層select的結果,陣列的第一個位置都是存放發生事件的socket的總數,其餘位置存放發生事件的socket控制代碼fd。

private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
private int poll() throws IOException{ // poll for the main thread
     return poll0(pollWrapper.pollArrayAddress,
          Math.min(totalChannels, MAX_SELECTABLE_FDS),
             readFds, writeFds, exceptFds, timeout);
}

執行 selector.select() ,poll0函式把指向socket控制代碼和事件的記憶體地址傳給底層函式。
1、如果之前沒有發生事件,程式就阻塞在select處,當然不會一直阻塞,因為epoll在timeout時間內如果沒有事件,也會返回;
2、一旦有對應的事件發生,poll0方法就會返回;
3、processDeregisterQueue方法會清理那些已經cancelled的SelectionKey;
4、updateSelectedKeys方法統計有事件發生的SelectionKey數量,並把符合條件發生事件的SelectionKey新增到selectedKeys雜湊表中,提供給後續使用。

在早期的JDK1.4和1.5 update10版本之前,Selector基於select/poll模型實現,是基於IO複用技術的非阻塞IO,不是非同步IO。在JDK1.5 update10和linux core2.6以上版本,sun優化了Selctor的實現,底層使用epoll替換了select/poll。

read實現

通過遍歷selector中的SelectionKeyImpl陣列,獲取發生事件的socketChannel物件,其中儲存了對應的socket,實現如下

public int read(ByteBuffer buf) throws IOException {
    if (buf == null)
        throw new NullPointerException();
    synchronized (readLock) {
        if (!ensureReadOpen())
            return -1;
        int n = 0;
        try {
            begin();
            synchronized (stateLock) {
                if (!isOpen()) {         
                    return 0;
                }
                readerThread = NativeThread.current();
            }
            for (;;) {
                n = IOUtil.read(fd, buf, -1, nd);
                if ((n == IOStatus.INTERRUPTED) && isOpen()) {
                    // The system call was interrupted but the channel
                    // is still open, so retry
                    continue;
                }
                return IOStatus.normalize(n);
            }
        } finally {
            readerCleanup();        // Clear reader thread
            // The end method, which 
            end(n > 0 || (n == IOStatus.UNAVAILABLE));

            // Extra case for socket channels: Asynchronous shutdown
            //
            synchronized (stateLock) {
                if ((n <= 0) && (!isInputOpen))
                    return IOStatus.EOF;
            }
            assert IOStatus.check(n);
        }
    }
}

最終通過Buffer的方式讀取socket的資料。

wakeup實現

public Selector wakeup() {
    synchronized (interruptLock) {
        if (!interruptTriggered) {
            setWakeupSocket();
            interruptTriggered = true;
        }
    }
    return this;
}

// Sets Windows wakeup socket to a signaled state.
private void setWakeupSocket() {
   setWakeupSocket0(wakeupSinkFd);
}
private native void setWakeupSocket0(int wakeupSinkFd);

看來wakeupSinkFd這個變數是為wakeup方法使用的。
其中interruptTriggered為中斷已觸發標誌,當pollWrapper.interrupt()之後,該標誌即為true了;因為這個標誌,連續兩次wakeup,只會有一次效果。

epoll原理

epoll是Linux下的一種IO多路複用技術,可以非常高效的處理數以百萬計的socket控制代碼。

三個epoll相關的系統呼叫:

  • int epoll_create(int size)
    epoll_create建立一個epoll物件。引數size是核心保證能夠正確處理的最大控制代碼數,多於這個最大數時核心可不保證效果。
  • int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
    epoll_ctl可以操作epoll_create建立的epoll,如將socket控制代碼加入到epoll中讓其監控,或把epoll正在監控的某個socket控制代碼移出epoll。
  • int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
    epoll_wait在呼叫時,在給定的timeout時間內,所監控的控制代碼中有事件發生時,就返回使用者態的程序。

epoll內部實現大概如下:

  1. epoll初始化時,會向核心註冊一個檔案系統,用於儲存被監控的控制代碼檔案,呼叫epoll_create時,會在這個檔案系統中建立一個file節點。同時epoll會開闢自己的核心快取記憶體區,以紅黑樹的結構儲存控制代碼,以支援快速的查詢、插入、刪除。還會再建立一個list連結串列,用於儲存準備就緒的事件。
  2. 當執行epoll_ctl時,除了把socket控制代碼放到epoll檔案系統裡file物件對應的紅黑樹上之外,還會給核心中斷處理程式註冊一個回撥函式,告訴核心,如果這個控制代碼的中斷到了,就把它放到準備就緒list連結串列裡。所以,當一個socket上有資料到了,核心在把網絡卡上的資料copy到核心中後,就把socket插入到就緒連結串列裡。
  3. 當epoll_wait呼叫時,僅僅觀察就緒連結串列裡有沒有資料,如果有資料就返回,否則就sleep,超時時立刻返回。

覺得不錯請點贊支援,歡迎留言或進我的個人群855801563領取【架構資料專題目合集90期】、【BATJTMD大廠JAVA面試真題1000+】,本群專用於學習交流技術、分享面試機會,拒絕廣告,我也會在群內不定