1. 程式人生 > 程式設計 >NIO 在Jetty中的應用

NIO 在Jetty中的應用

引子

作為縱橫情場多年的老手,憲程在把到妹子後通常有以下策略 (假設憲程是影流之主的第1024代傳人並且只剩下了分身的能力)

  • 將妹子存到佇列中,不時發微信去撩一下,如果有意向的話憲程會使用分身能力再建立一個憲程去把妹

  • 憲程自己執行把妹的操作,如果期間又有新的妹子看上他咋辦呢,那就將該妹子交給自己的分身憲程去輪詢處理,並且憲程在把完妹子之後會嘗試去把分身憲程的輪詢任務給接過來,畢竟本體總是要掌握主動權的,如果沒有接過來咋辦?只能選擇成為分身了,畢竟此時分身憲程已經接過了本體的工作,某種意義上他已經成為了本體

Jetty NIO 模型

建議在閱讀之前先了解以下Tomcat的NIO模型,沒有對比就沒有傷害,你會發現Jetty NIO模型的有趣之處

概述

如果時間充足的話,我建議你直接閱讀附錄,瞭解如何Debug Jetty NIO功能

既然要了解Jetty的NIO模型,從執行緒的角度來說可以分為以下幾類

  • 空閒執行緒 此角色會根據提交到執行緒池中的任務,將自己轉變為I/O執行緒或者輪詢執行緒

  • Acceptor執行緒 該角色主要負責接收來自客戶端的連線並對其進行封裝之後,選擇一個Selector來提交此任務

  • 輪詢執行緒 此角色主要負責輪詢事件,並處理其他角色提交給此角色的任務,另外此角色可以根據所設定的策略將輪詢任務交給其他執行緒,在執行完I/O任務之後歸還到執行緒池中成為空閒執行緒

主要參與的類有

  • Connector 該角色主要負責JettyNIO模型中各個元件的啟動和,協調工作

  • SelectorManager 此角色主要對ManagedSelector進行管理,想要和Selector進行互動可以使用此類

  • ManagedSelector 封裝了JDK原生的selector,並對外提供對selector執行操作的內部類、介面以及方法

重點 所有執行緒共用一個執行緒池

Connector

關鍵類 org.eclipse.jetty.server.ServerConnector

Connector即聯結器,是Jetty對於網路I/O模型的一個抽象,主要負責組裝,啟動Jetty NIO模型中所需要用到的元件。因此,我們主要注意力集中到其實現上也就是ServerConnector

上。

初始化Connector聯結器,我們需要向其提供以下關鍵引數(隱去了和本文無關的引數,有興趣的可自行了解)

  • 用來執行接收新連線、處理I/O、輪詢事件任務的執行緒池
  • ByteBuffer 物件池,該物件池可以回收以及提供ByteBuffer給I/O執行緒使用
  • 負責執行accept操作執行緒的數量
  • 負責執行輪詢任務的selector執行緒數量

但是,大部分的初始化工作並不是在ServerConnector中執行的,而是在其父類中執行的操作,因此我們將目光轉移到 org.eclipse.jetty.server.AbstractConnector

該類的初始化程式碼如下,其主要做了以下工作

  • 檢查是否指定執行緒池,如果沒有則和Server共用一個執行緒池
  • 檢查是否指定ByteBufferPool,如果沒有則使用ArrayByteBuffer
  • 檢查是否設定Acceptor數量,如果沒有則按照max(1,min(4,CPU核心數÷8))進行計算,也就是說預設的Acceptor數量最少有一個,最多有4個

想象一下,如果ServerSocketChannel被設定為阻塞狀態以便多個執行緒同時執行accept操作,那麼大多數情況下多數執行緒將會陷入阻塞狀態,並且執行緒從阻塞態恢復是有執行緒上下文切換的成本的因此Acceptor執行緒並不是越多越好

    public AbstractConnector(
        Server server,Executor executor,Scheduler scheduler,ByteBufferPool pool,int acceptors,ConnectionFactory... factories)
    {
        _server = server;
        //檢查是否設定執行緒池,如果沒有則使用Server的
        _executor = executor != null ? executor : _server.getThreadPool();
        if (scheduler == null)
            scheduler = _server.getBean(Scheduler.class);
        _scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x",hashCode()),false);
        
        // 檢查是否指定ByteBufferPool,如果沒有則自己建立一個
        if (pool == null)
            pool = _server.getBean(ByteBufferPool.class);
        _byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
        // 將這些物件交給Jetty統一管理(不在本文討論範圍內,不展開)
        addBean(_server,false);
        addBean(_executor);
        if (executor == null)
            unmanage(_executor); // inherited from server
        addBean(_scheduler);
        addBean(_byteBufferPool);
        // ConnectionFactory主要使用來處理對應的HTTP協議
        for (ConnectionFactory factory : factories)
        {
            addConnectionFactory(factory);
        }
        // 如果未指定Acceptor的數量則根據CPU核數執行計算
        int cores = ProcessorUtils.availableProcessors();
        if (acceptors < 0)
           //根據此式可以推出Acceptor數量最大是4最小是1
            acceptors = Math.max(1,Math.min(4,cores / 8));
        // Acceptor數量大於CPU核心數
        // 將會引起大量的執行緒陷入阻塞狀態
        // 沒有東西可以accept不就阻塞了嗎
        // 而要啟用阻塞的執行緒則需要切換執行緒上下文會引起效能的浪費
        if (acceptors > cores)
            LOG.warn("Acceptors should be <= availableProcessors: " + this);
        _acceptors = new Thread[acceptors];
    }

複製程式碼

如下圖所示我的電腦為4核心的i5CPU,那麼預設的Acceptor執行緒應該只有一個

4核心CPU
在啟動你的Jetty之後我們可以用JConsole來驗證一下
正如你所看到的,以qtp開頭的執行緒用於NIO的執行緒池,其中一個Acceptor執行緒阻塞在accept()方法上

Acceptor

Acceptor是一個定義在AbstractConnector中的內部類,其主要工作不斷呼叫在子類中實現accept方法,也就是接收連線的實現延遲到了子類中。

其代如下,可以學到不少小技巧,如果你不想看程式碼,其總結如下

  • 獲取執行當前程式碼執行緒,給他起個名字,見上一節JConsole的截圖
  • 將Acceptor執行緒優先順序調至最高(當然,不一定起作用,還得看人作業系統理不理你)
  • 在執行accept操作之前需要等待來自其他執行緒的放行訊號
  • 不斷迴圈執行accept操作
      public void run()
        {
           // 給執行緒起給名字
            final Thread thread = Thread.currentThread();
            String name = thread.getName();
            _name = String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
            thread.setName(_name);
            // 設定優先順序
            int priority = thread.getPriority();
            if (_acceptorPriorityDelta != 0)
                thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority + _acceptorPriorityDelta)));
            // 儲存對此執行緒的引用
            _acceptors[_id] = thread;
            
            try
            {
                while (isRunning())
                {
                    // 加鎖,等待來自其他執行緒的訊號說可以開始幹活了
                    try (Locker.Lock lock = _locker.lock())
                    {
                        if (!_accepting && isRunning())
                        {
                            _setAccepting.await();
                            continue;
                        }
                    }
                    catch (InterruptedException e)
                    {
                        continue;
                    }

                    try
                    {
                       //呼叫子類的accept方法
                        accept(_id);
                    }
                    catch (Throwable x)
                    {
                        if (!handleAcceptFailure(x))
                            break;
                    }
                }
            }
            finally
            {
               // 發生異常了,則將執行緒的名稱以及優先順序調回原來的值
                thread.setName(name);
                if (_acceptorPriorityDelta != 0)
                    thread.setPriority(priority);
                
                //釋放引用
                synchronized (AbstractConnector.this)
                {
                    _acceptors[_id] = null;
                }
                CountDownLatch stopping = _stopping;
                if (stopping != null)
                    stopping.countDown();
            }
        }
複製程式碼

在子類ServerConnector中,accept主要執行以下操作

  • 阻塞的形式接收來自客戶端的連線
  • 設定客戶端SocketChannel非阻塞模式,並禁用nagle演演算法
  • 交給SelectorManager來處理,該類會將客戶端SocketChannel封裝成一個Accept事件,交給輪詢執行緒處理 ServerConnector中的程式碼
    @Override
    public void accept(int acceptorID) throws IOException
    {
        ServerSocketChannel serverChannel = _acceptChannel;
        if (serverChannel != null && serverChannel.isOpen())
        {
            SocketChannel channel = serverChannel.accept();
            accepted(channel);
        }
    }

    private void accepted(SocketChannel channel) throws IOException
    {
        channel.configureBlocking(false);
        Socket socket = channel.socket();
        configure(socket); // socket.setTcpNoDelay(true);
        _manager.accept(channel);
    }
複製程式碼

SelectorManager中最終被呼叫的程式碼

    public void accept(SelectableChannel channel,Object attachment)
    {
        final ManagedSelector selector = chooseSelector();
        selector.submit(selector.new Accept(channel,attachment));
    }
複製程式碼

輪詢執行緒

輪詢執行緒主要負責輪詢I/O事件以及處理其他執行緒提交到本執行緒任務。並且我們可以為輪詢執行緒指定執行策略,在後面我們可以看到執行策略將如何影響輪詢執行緒行為。

首先,我們需要先明確哪些類會參與到輪詢執行緒的工作中,也就是說我們要先理清楚輪詢執行緒的呼叫鏈。

如上圖堆疊跟蹤圖紅框所標註的部分所示,參與到輪詢執行緒主要堆疊結構如下圖所示。

  • ManagedSelector 此類主要封裝了JDK的selector類,並對外暴露操作此Selector的方法和類
  • EatWhatYouKill 此類即輪詢執行緒執行策略,該類會不斷呼叫SelectorProducer.produce 方法產生封裝好的I/O任務,並根據其策略來決定執行這個I/O任務的方式
  • SelectorProducer 此類為ManagedSelector的內部類,實現執行緒執行策略裡面的ExecutionStrategy.Producer介面,該類專門用於生成供輪詢執行緒處理的I/O任務

ManagedSelector

Jetty將JDK原生的Selector類封裝成為ManagedSelector,該類主要功能是對外暴露對其封裝的selector執行操作的介面和內部類. 其關鍵方法和內部類如下

SelectorUpdate介面 如果要對ManagedSelector所管理的selector進行更新(如執行註冊感興趣的I/O事件)可以實現此介面,該介面定義如下

    public interface SelectorUpdate
    {
        void update(Selector selector);
    }
複製程式碼

submit方法 該方法主要用於外界將SelectorUpdate提交到輪詢執行緒中以便執行對Selector的更新操作,簡單來說此方法會執行以下操作

  • 將update事件加入佇列
  • 檢查Selector是否正在執行select操作,如果是則將其喚醒,使其從阻塞狀態返回以便我們對其進行更新
    public void submit(SelectorUpdate update)
    {
        if (LOG.isDebugEnabled())
            LOG.debug("Queued change {} on {}",update,this);

        Selector selector = null;
        synchronized (ManagedSelector.this)
        {
            //加事件加入處理佇列
            _updates.offer(update);
            //檢查是否正在輪詢,如果正在輪詢,則會執行喚醒操作
            //因此在此處需要將selecting置為false
            if (_selecting)
            {
                selector = _selector;
                // To avoid the extra select wakeup.
                _selecting = false;
            }
        }

        if (selector != null)
        {
           //執行喚醒操作,以便對selector執行更新操作
            if (LOG.isDebugEnabled())
                LOG.debug("Wakeup on submit {}",this);
            selector.wakeup();
        }
    }
複製程式碼

SelectorProducer

SelectorProducerManagedSelector的內部類,該類實現了輪詢執行緒執行策略的ExecutionStrategy.Producer介面

    interface Producer
    {
        // 返回一個Runnable任務供輪詢執行緒執行
        Runnable produce();
    }
複製程式碼

因此SelectorProducer需要不斷呼叫selector去輪詢看有無新的I/O事件以供處理,除此之外它還需要處理外部類向ManagedSelector通過呼叫submit方法提交的SelectorUpdate任務

其向執行緒執行策略類所提供produce方法代如下所示,總的來說主要完成以下幾項工作

  • 執行一個迴圈,直到輪詢到感興趣的任務(一次只返回一個,被輪詢到事件會被儲存起來供下一次使用)
  • 處理外部類向其提交的任務(呼叫processUpdates)
  • 更新客戶端SocketChannel感興趣的事件
        @Override
        public Runnable produce()
        {
            while (true)
            {
                //處理之前查詢到事件
                Runnable task = processSelected();
                if (task != null)
                    return task;
                //處理外部類所提交的update任務
                //該方法最終會導致提交的SelectorUpdate.update被呼叫
                processUpdates();
                //此方法的呼叫可能會
                //導致客戶端SocketChannel感興趣的事件發生變更
                updateKeys();
                //執行select操作,並將查詢到事件儲存起來
                if (!select())
                    return null;
            }
        }
複製程式碼

processUpdates 此方法主要是處理外部類提交的SelectorUpdate任務,通過複製引用非常巧妙的避免了併發問題

        private void processUpdates()
        {
            synchronized (ManagedSelector.this)
            {
                //倒騰資料,將要處理佇列的引用儲存
                //到另一個變數上,原有的引用可以繼續對外提供服務
                //整個資料倒騰過程非常短,效能影響較小
                Deque<SelectorUpdate> updates = _updates;
                _updates = _updateable;
                _updateable = updates;
            }

            if (LOG.isDebugEnabled())
                LOG.debug("updateable {}",_updateable.size());
            //遍歷事件佇列,處理update方法
            for (SelectorUpdate update : _updateable)
            {
                if (_selector == null)
                    break;
                try
                {
                    if (LOG.isDebugEnabled())
                        LOG.debug("update {}",update);
                    //呼叫事件的update方法,並傳入selector
                    update.update(_selector);
                }
                catch (Throwable ex)
                {
                    LOG.warn(ex);
                }
            }
            _updateable.clear();

            Selector selector;
            int updates;
            //再次檢查是否有新的事件被提交,如果有則執行喚醒操作
            synchronized (ManagedSelector.this)
            {
               //外部類提交的任務會儲存到updates中
                updates = _updates.size();
                _selecting = updates == 0;
                selector = _selecting ? null : _selector;
            }

            if (LOG.isDebugEnabled())
                LOG.debug("updates {}",updates);

            if (selector != null)
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("wakeup on updates {}",this);
                selector.wakeup();
            }
        }

複製程式碼

select() 該方法主要執行輪詢操作,並將輪詢到事件儲存起來以供下一次迴圈的時候返回,在這個方法中展現jetty如何處理空輪詢事件(空輪詢是指selector在執行select操作時,沒有查詢到任何事件卻返回了,這個BUG通常會造成CPU100%的使用率,從而使系統崩潰)

        private boolean select()
        {
            try
            {
                Selector selector = _selector;
                if (selector != null && selector.isOpen())
                {
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} waiting with {} keys",selector,selector.keys().size());
                    int selected = selector.select();
                    //沒查詢到事件,空輪詢事件處理
                    if (selected == 0)
                    {
                        if (LOG.isDebugEnabled())
                            LOG.debug("Selector {} woken with none selected",selector);
                        //如果執行緒被中斷,並且標誌位被設定了不在執行則執行推出邏輯
                        if (Thread.interrupted() && !isRunning())
                            throw new ClosedSelectorException();
                        //開啟了此引數則立即執行一次select操作
                        if (FORCE_SELECT_NOW)
                            selected = selector.selectNow();
                    }
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} woken up from select,{}/{}/{} selected",selected,selector.selectedKeys().size(),selector.keys().size());

                    int updates;
                    synchronized (ManagedSelector.this)
                    {
                        // 完成了select操作則設定標誌位
                        _selecting = false;
                        updates = _updates.size();
                    }

                    _keys = selector.selectedKeys();
                    _cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator();
                    if (LOG.isDebugEnabled())
                        LOG.debug("Selector {} processing {} keys,{} updates",_keys.size(),updates);

                    return true;
                }
            }
            catch (Throwable x)
            {
                _selector = null;
                if (isRunning())
                    LOG.warn(x);
                else
                {
                    LOG.warn(x.toString());
                    LOG.debug(x);
                }
                closeNoExceptions(_selector);
            }
            return false;
        }
複製程式碼

與Netty的空輪詢處理策略不同,Jetty的處理策略是再select一次並立即返回,但這樣似乎並不能解決空輪詢的BUG問題詳情

EatWhatYouKill

EatWhatYouKill是執行緒執行策略的一種,也是Jetty預設的指策略,其思想來源於如果獵人殺死一隻獵物,那麼獵人就應該吃掉它(如果你吃過新鮮的蝦你就會對這種哲學深有體會),換種說法就是輪詢執行緒如果查詢到一次I/O事件就應該直接處理它(想起引子了嗎)

P.S. 關鍵程式碼org.eclipse.jetty.util.thread.strategy.EatWhatYouKill

之所以這樣做的原因是因為切換執行緒是一件比較費時操作(相對來說),因此在這種策略下輪詢執行緒A如果獲取到一個事件會有以下策略

  • 如果此任務被標誌為非阻塞任務,那麼執行緒A會立即執行此任務

如果任務阻塞型別未知或者被標記為阻塞狀態

  • 如果執行緒池中的執行緒都處於繁忙狀態,則將其提交到執行緒池種等待執行

  • 如果執行緒池種有空閒執行緒B,則嘗試將執行緒A負責輪詢功能交給執行緒B,如果立即獲取到執行緒B成功,則執行緒A會直接執行獲取到的任務,任務執行完成後,執行緒A會嘗試奪回交給執行緒B的輪詢任務,如果奪回失敗則變為空閒執行緒等待分配任務。(想起引子了嗎?)

  • 除此之外,執行緒A還會嘗試直接執行任務並且不會交出輪詢工作 (程式碼太長,只摘出關鍵程式碼)

    case BLOCKING:
        synchronized (this)
        {
            if (_pending)
            {
                //輪詢工作陷入了停滯,因此是IDLE狀態
                _state = State.IDLE;
                mode = Mode.EXECUTE_PRODUCE_CONSUME;
            }
            //tryExecute 如果立即分配到了執行緒則返回true
            //this的run方法也就是實現輪詢執行緒核心的方法
            //因此此行程式碼相當於將輪詢的工作轉移給了其他執行緒
            else if (_tryExecutor.tryExecute(this))
            {
                _pending = true;
                //由於輪詢工作的轉移
                //因此當前輪詢工作相當於陷入空閒狀態
                //所以需要將此物件的狀態至為IDLE
                //(輪詢執行緒和當前執行緒使用同一個物件)
                _state = State.IDLE;
                mode = Mode.EXECUTE_PRODUCE_CONSUME;
            }else
            {
               //前兩者均不滿足則將任務提交到執行緒池
                mode = Mode.PRODUCE_EXECUTE_CONSUME;
            }
        }
        break;
複製程式碼

任務的執行策略

            case EXECUTE_PRODUCE_CONSUME:
                _epcMode.increment();
                //直接在當前執行緒呼叫
                runTask(task);

                // 嘗試奪回輪詢任務
                synchronized (this)
                {
                   // 如果State還處於空閒狀態
                   // 說明
                   // 執行緒B還未開始執行輪詢任務,可以直接奪回
                   // 如果執行緒B已經開始輪詢
                   // 則選擇離開
                    if (_state == State.IDLE)
                    {
                        // 返回true則繼續輪詢
                        return true;
                    }
                }
                //返回false則結束輪詢任務,變為空閒執行緒
                return false;
複製程式碼

總結

相較於循規蹈矩的Tomcat,Jetty的設計更為激進,更富有冒險主義者的精神,從個人角度來說更喜歡Jetty的設計,但從業務的角度來說還是選擇Tomcat較為穩妥畢竟穩定是業務的基本需求,並且Tomcat的效能也不會太差。

以執行緒的類別來進行劃分的話,Jetty的NIO模型如下圖所示

  • Acceptor 執行緒負責接收來自客戶端的新連線,並將其封裝成一個事件提交給輪詢執行緒處理
  • 輪詢執行緒 輪詢執行緒處理負責輪詢I/O事件之外,還需要處理外部執行緒所提交的selector更新任務,並且根據設定的執行策略,輪詢執行緒可能會在本執行緒直接執行I/O任務,並將輪詢任務移交給其他空閒的執行緒,或者選擇一個空閒的執行緒來執行I/O操作
  • I/O執行緒 主要負責處理I/O操作

從執行緒類別的角度來看Jetty的NIO模型相對簡單,但其引入的輪詢執行緒執行策略使得執行緒之間身份可以發生轉變,得益於此Jetty可以直接輪詢執行緒直接執行I/O任務減少了執行緒上下文切換所帶來的效能消耗,提升了效能。

思想遷移

切換執行緒是有成本的 Jetty通過直接在輪詢執行緒執行I/O任務來提升效能,來減少執行緒上下文的切換,除此之外,我們還可以實現協程的機制來減少執行緒上下文切換所帶來的成本(參考Go語言)

Acceptor執行緒應適量 如果將ServerSocket設定為阻塞模式,那麼accept操作將導致執行緒陷入阻塞,從accept方法返回時將引起執行緒上下的切換,因此並不是越多越好

如何Debug Jetty

我們使用SpringBoot來Debug Jetty,因此我們需要在pom.xml中引入Jetty,由於SpringBoot預設使用Tomcat因此我們需要將其替換掉,依賴如下所示.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-tomcat</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jetty</artifactId>
        </dependency>
複製程式碼

使用的SpringBoot版本是2.2.0其所依賴的Jetty版本號是9.4.20

  • 如果你要了解Connector是如何工作的請關注以下類 org.eclipse.jetty.server.ServerConnector

  • 如果你想要了解Jetty NIO 如何輪詢以及處理事件,那麼請關注以下類 org.eclipse.jetty.io.ManagedSelector 並在其內部類 SelectorProducerproduce方法打上斷點,如下圖所示,你將瞭解到整個輪詢過程中都發生了什麼

右鍵小紅點,選擇Thread,以避免進入不了斷點的情況,畢竟我們除錯的是多執行緒程式

  • 如果你想要了解執行緒執行的策略,那麼請關注以下類(此類執行機制較為複雜,如果想Debug到所有的情況,最好結合一定的策略,如在Controller程式碼處阻塞住執行緒等) org.eclipse.jetty.util.thread.strategy.EatWhatYouKill