NIO 在Jetty中的應用
引子
作為縱橫情場多年的老手,憲程在把到妹子後通常有以下策略
(假設憲程是影流之主
的第1024代傳人並且只剩下了分身的能力)
-
將妹子存到佇列中,不時發微信去撩一下,如果有意向的話憲程會使用分身能力再建立一個
憲程
去把妹 -
憲程自己執行把妹的操作,如果期間又有新的妹子看上他咋辦呢,那就將該妹子交給自己的分身
憲程
去輪詢處理,並且憲程在把完妹子之後會嘗試去把分身憲程
的輪詢任務給接過來,畢竟本體總是要掌握主動權的,如果沒有接過來咋辦?只能選擇成為分身了,畢竟此時分身憲程
已經接過了本體的工作,某種意義上他已經成為了本體
。
Jetty NIO 模型
建議在閱讀之前先了解以下Tomcat的NIO模型,沒有對比就沒有傷害,你會發現Jetty NIO模型的有趣之處
概述
如果時間充足的話,我建議你直接閱讀附錄,瞭解如何Debug Jetty NIO功能
既然要了解Jetty的NIO模型,從執行緒的角度來說可以分為以下幾類
-
空閒執行緒
此角色會根據提交到執行緒池中的任務,將自己轉變為I/O執行緒或者輪詢執行緒 -
Acceptor執行緒
該角色主要負責接收來自客戶端的連線並對其進行封裝之後,選擇一個Selector來提交此任務 -
輪詢執行緒
此角色主要負責輪詢事件,並處理其他角色提交給此角色的任務,另外此角色可以根據所設定的策略將輪詢任務交給其他執行緒,在執行完I/O任務之後歸還到執行緒池中成為空閒執行緒
主要參與的類有
-
Connector
該角色主要負責JettyNIO模型中各個元件的啟動和,協調工作 -
SelectorManager
此角色主要對ManagedSelector
進行管理,想要和Selector進行互動可以使用此類 -
ManagedSelector
封裝了JDK原生的selector
,並對外提供對selector
執行操作的內部類、介面以及方法
重點 所有執行緒共用一個執行緒池
Connector
關鍵類
org.eclipse.jetty.server.ServerConnector
Connector即聯結器,是Jetty對於網路I/O模型的一個抽象,主要負責組裝,啟動Jetty NIO模型中所需要用到的元件。因此,我們主要注意力集中到其實現上也就是ServerConnector
初始化Connector聯結器,我們需要向其提供以下關鍵引數(隱去了和本文無關的引數,有興趣的可自行了解)
- 用來執行接收新連線、處理I/O、輪詢事件任務的執行緒池
- ByteBuffer 物件池,該物件池可以
回收
以及提供
ByteBuffer給I/O執行緒使用 - 負責執行
accept
操作執行緒的數量 - 負責執行輪詢任務的
selector
執行緒數量
但是,大部分的初始化工作並不是在ServerConnector
中執行的,而是在其父類中執行的操作,因此我們將目光轉移到
org.eclipse.jetty.server.AbstractConnector
該類的初始化程式碼如下,其主要做了以下工作
- 檢查是否指定執行緒池,如果沒有則和Server共用一個執行緒池
- 檢查是否指定ByteBufferPool,如果沒有則使用ArrayByteBuffer
- 檢查是否設定Acceptor數量,如果沒有則按照
max(1,min(4,CPU核心數÷8))
進行計算,也就是說預設的Acceptor數量最少有一個,最多有4個
想象一下,如果ServerSocketChannel被設定為阻塞狀態以便多個執行緒同時執行accept操作,那麼大多數情況下多數執行緒將會陷入阻塞狀態,並且執行緒從阻塞態恢復是有執行緒上下文切換的成本的因此Acceptor執行緒並不是越多越好
public AbstractConnector(
Server server,Executor executor,Scheduler scheduler,ByteBufferPool pool,int acceptors,ConnectionFactory... factories)
{
_server = server;
//檢查是否設定執行緒池,如果沒有則使用Server的
_executor = executor != null ? executor : _server.getThreadPool();
if (scheduler == null)
scheduler = _server.getBean(Scheduler.class);
_scheduler = scheduler != null ? scheduler : new ScheduledExecutorScheduler(String.format("Connector-Scheduler-%x",hashCode()),false);
// 檢查是否指定ByteBufferPool,如果沒有則自己建立一個
if (pool == null)
pool = _server.getBean(ByteBufferPool.class);
_byteBufferPool = pool != null ? pool : new ArrayByteBufferPool();
// 將這些物件交給Jetty統一管理(不在本文討論範圍內,不展開)
addBean(_server,false);
addBean(_executor);
if (executor == null)
unmanage(_executor); // inherited from server
addBean(_scheduler);
addBean(_byteBufferPool);
// ConnectionFactory主要使用來處理對應的HTTP協議
for (ConnectionFactory factory : factories)
{
addConnectionFactory(factory);
}
// 如果未指定Acceptor的數量則根據CPU核數執行計算
int cores = ProcessorUtils.availableProcessors();
if (acceptors < 0)
//根據此式可以推出Acceptor數量最大是4最小是1
acceptors = Math.max(1,Math.min(4,cores / 8));
// Acceptor數量大於CPU核心數
// 將會引起大量的執行緒陷入阻塞狀態
// 沒有東西可以accept不就阻塞了嗎
// 而要啟用阻塞的執行緒則需要切換執行緒上下文會引起效能的浪費
if (acceptors > cores)
LOG.warn("Acceptors should be <= availableProcessors: " + this);
_acceptors = new Thread[acceptors];
}
複製程式碼
如下圖所示我的電腦為4核心
的i5CPU,那麼預設的Acceptor執行緒應該只有一個
正如你所看到的,以qtp開頭的執行緒用於NIO的執行緒池,其中一個Acceptor執行緒阻塞在accept()方法上
Acceptor
Acceptor是一個定義在AbstractConnector
中的內部類,其主要工作不斷呼叫在子類中實現accept方法,也就是接收連線的實現延遲到了子類中。
其代如下,可以學到不少小技巧,如果你不想看程式碼,其總結如下
- 獲取執行當前程式碼執行緒,給他起個名字,見上一節JConsole的截圖
- 將Acceptor執行緒優先順序調至最高(當然,不一定起作用,還得看人作業系統理不理你)
- 在執行accept操作之前需要等待來自其他執行緒的放行訊號
- 不斷迴圈執行accept操作
public void run()
{
// 給執行緒起給名字
final Thread thread = Thread.currentThread();
String name = thread.getName();
_name = String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
thread.setName(_name);
// 設定優先順序
int priority = thread.getPriority();
if (_acceptorPriorityDelta != 0)
thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority + _acceptorPriorityDelta)));
// 儲存對此執行緒的引用
_acceptors[_id] = thread;
try
{
while (isRunning())
{
// 加鎖,等待來自其他執行緒的訊號說可以開始幹活了
try (Locker.Lock lock = _locker.lock())
{
if (!_accepting && isRunning())
{
_setAccepting.await();
continue;
}
}
catch (InterruptedException e)
{
continue;
}
try
{
//呼叫子類的accept方法
accept(_id);
}
catch (Throwable x)
{
if (!handleAcceptFailure(x))
break;
}
}
}
finally
{
// 發生異常了,則將執行緒的名稱以及優先順序調回原來的值
thread.setName(name);
if (_acceptorPriorityDelta != 0)
thread.setPriority(priority);
//釋放引用
synchronized (AbstractConnector.this)
{
_acceptors[_id] = null;
}
CountDownLatch stopping = _stopping;
if (stopping != null)
stopping.countDown();
}
}
複製程式碼
在子類ServerConnector
中,accept
主要執行以下操作
- 以
阻塞
的形式接收來自客戶端的連線 - 設定客戶端
SocketChannel
為非阻塞模式
,並禁用nagle演演算法
- 交給SelectorManager來處理,該類會將客戶端
SocketChannel
封裝成一個Accept
事件,交給輪詢執行緒
處理ServerConnector
中的程式碼
@Override
public void accept(int acceptorID) throws IOException
{
ServerSocketChannel serverChannel = _acceptChannel;
if (serverChannel != null && serverChannel.isOpen())
{
SocketChannel channel = serverChannel.accept();
accepted(channel);
}
}
private void accepted(SocketChannel channel) throws IOException
{
channel.configureBlocking(false);
Socket socket = channel.socket();
configure(socket); // socket.setTcpNoDelay(true);
_manager.accept(channel);
}
複製程式碼
SelectorManager中最終被呼叫的程式碼
public void accept(SelectableChannel channel,Object attachment)
{
final ManagedSelector selector = chooseSelector();
selector.submit(selector.new Accept(channel,attachment));
}
複製程式碼
輪詢執行緒
輪詢執行緒
主要負責輪詢I/O事件以及處理其他執行緒提交到本執行緒任務。並且我們可以為輪詢執行緒
指定執行策略,在後面我們可以看到執行策略將如何影響輪詢執行緒
行為。
首先,我們需要先明確哪些類會參與到輪詢執行緒的工作中,也就是說我們要先理清楚輪詢執行緒的呼叫鏈。
如上圖堆疊跟蹤圖紅框所標註的部分所示,參與到輪詢執行緒主要堆疊結構如下圖所示。
-
ManagedSelector
此類主要封裝了JDK的selector
類,並對外暴露操作此Selector的方法和類 -
EatWhatYouKill
此類即輪詢執行緒執行策略,該類會不斷呼叫SelectorProducer.produce 方法產生封裝好的I/O任務,並根據其策略來決定執行這個I/O任務的方式 -
SelectorProducer
此類為ManagedSelector
的內部類,實現執行緒執行策略裡面的ExecutionStrategy.Producer
介面,該類專門用於生成供輪詢執行緒處理的I/O任務
ManagedSelector
Jetty將JDK原生的Selector
類封裝成為ManagedSelector
,該類主要功能是對外暴露對其封裝的selector
執行操作的介面和內部類.
其關鍵方法和內部類如下
SelectorUpdate介面 如果要對ManagedSelector
所管理的selector
進行更新(如執行註冊感興趣的I/O事件)可以實現此介面,該介面定義如下
public interface SelectorUpdate
{
void update(Selector selector);
}
複製程式碼
submit方法 該方法主要用於外界將SelectorUpdate
提交到輪詢執行緒中以便執行對Selector
的更新操作,簡單來說此方法會執行以下操作
- 將update事件加入佇列
- 檢查Selector是否正在執行select操作,如果是則將其喚醒,使其從阻塞狀態返回以便我們對其進行更新
public void submit(SelectorUpdate update)
{
if (LOG.isDebugEnabled())
LOG.debug("Queued change {} on {}",update,this);
Selector selector = null;
synchronized (ManagedSelector.this)
{
//加事件加入處理佇列
_updates.offer(update);
//檢查是否正在輪詢,如果正在輪詢,則會執行喚醒操作
//因此在此處需要將selecting置為false
if (_selecting)
{
selector = _selector;
// To avoid the extra select wakeup.
_selecting = false;
}
}
if (selector != null)
{
//執行喚醒操作,以便對selector執行更新操作
if (LOG.isDebugEnabled())
LOG.debug("Wakeup on submit {}",this);
selector.wakeup();
}
}
複製程式碼
SelectorProducer
SelectorProducer
是ManagedSelector
的內部類,該類實現了輪詢執行緒執行策略的ExecutionStrategy.Producer
介面
interface Producer
{
// 返回一個Runnable任務供輪詢執行緒執行
Runnable produce();
}
複製程式碼
因此SelectorProducer
需要不斷呼叫selector
去輪詢看有無新的I/O事件以供處理,除此之外它還需要處理外部類向ManagedSelector
通過呼叫submit
方法提交的SelectorUpdate
任務
其向執行緒執行策略類所提供produce
方法代如下所示,總的來說主要完成以下幾項工作
- 執行一個迴圈,直到輪詢到感興趣的任務(一次只返回一個,被輪詢到事件會被儲存起來供下一次使用)
- 處理外部類向其提交的任務(呼叫
processUpdates
) - 更新客戶端SocketChannel感興趣的事件
@Override
public Runnable produce()
{
while (true)
{
//處理之前查詢到事件
Runnable task = processSelected();
if (task != null)
return task;
//處理外部類所提交的update任務
//該方法最終會導致提交的SelectorUpdate.update被呼叫
processUpdates();
//此方法的呼叫可能會
//導致客戶端SocketChannel感興趣的事件發生變更
updateKeys();
//執行select操作,並將查詢到事件儲存起來
if (!select())
return null;
}
}
複製程式碼
processUpdates
此方法主要是處理外部類提交的SelectorUpdate
任務,通過複製引用非常巧妙的避免了併發問題
private void processUpdates()
{
synchronized (ManagedSelector.this)
{
//倒騰資料,將要處理佇列的引用儲存
//到另一個變數上,原有的引用可以繼續對外提供服務
//整個資料倒騰過程非常短,效能影響較小
Deque<SelectorUpdate> updates = _updates;
_updates = _updateable;
_updateable = updates;
}
if (LOG.isDebugEnabled())
LOG.debug("updateable {}",_updateable.size());
//遍歷事件佇列,處理update方法
for (SelectorUpdate update : _updateable)
{
if (_selector == null)
break;
try
{
if (LOG.isDebugEnabled())
LOG.debug("update {}",update);
//呼叫事件的update方法,並傳入selector
update.update(_selector);
}
catch (Throwable ex)
{
LOG.warn(ex);
}
}
_updateable.clear();
Selector selector;
int updates;
//再次檢查是否有新的事件被提交,如果有則執行喚醒操作
synchronized (ManagedSelector.this)
{
//外部類提交的任務會儲存到updates中
updates = _updates.size();
_selecting = updates == 0;
selector = _selecting ? null : _selector;
}
if (LOG.isDebugEnabled())
LOG.debug("updates {}",updates);
if (selector != null)
{
if (LOG.isDebugEnabled())
LOG.debug("wakeup on updates {}",this);
selector.wakeup();
}
}
複製程式碼
select() 該方法主要執行輪詢操作,並將輪詢到事件儲存起來以供下一次迴圈的時候返回,在這個方法中展現jetty如何處理空輪詢
事件(空輪詢
是指selector在執行select操作時,沒有查詢到任何事件卻返回了,這個BUG通常會造成CPU100%
的使用率,從而使系統崩潰)
private boolean select()
{
try
{
Selector selector = _selector;
if (selector != null && selector.isOpen())
{
if (LOG.isDebugEnabled())
LOG.debug("Selector {} waiting with {} keys",selector,selector.keys().size());
int selected = selector.select();
//沒查詢到事件,空輪詢事件處理
if (selected == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken with none selected",selector);
//如果執行緒被中斷,並且標誌位被設定了不在執行則執行推出邏輯
if (Thread.interrupted() && !isRunning())
throw new ClosedSelectorException();
//開啟了此引數則立即執行一次select操作
if (FORCE_SELECT_NOW)
selected = selector.selectNow();
}
if (LOG.isDebugEnabled())
LOG.debug("Selector {} woken up from select,{}/{}/{} selected",selected,selector.selectedKeys().size(),selector.keys().size());
int updates;
synchronized (ManagedSelector.this)
{
// 完成了select操作則設定標誌位
_selecting = false;
updates = _updates.size();
}
_keys = selector.selectedKeys();
_cursor = _keys.isEmpty() ? Collections.emptyIterator() : _keys.iterator();
if (LOG.isDebugEnabled())
LOG.debug("Selector {} processing {} keys,{} updates",_keys.size(),updates);
return true;
}
}
catch (Throwable x)
{
_selector = null;
if (isRunning())
LOG.warn(x);
else
{
LOG.warn(x.toString());
LOG.debug(x);
}
closeNoExceptions(_selector);
}
return false;
}
複製程式碼
與Netty的空輪詢處理策略不同,Jetty的處理策略是再select一次並立即返回,但這樣似乎並不能解決空輪詢的BUG問題詳情
EatWhatYouKill
EatWhatYouKill
是執行緒執行策略的一種,也是Jetty預設的指策略,其思想來源於如果獵人殺死一隻獵物,那麼獵人就應該吃掉它
(如果你吃過新鮮的蝦你就會對這種哲學
深有體會),換種說法就是輪詢執行緒如果查詢到一次I/O事件就應該直接處理它
(想起引子了嗎)
P.S. 關鍵程式碼
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill
之所以這樣做的原因是因為切換執行緒是一件比較費時操作(相對來說),因此在這種策略下輪詢執行緒A如果獲取到一個事件會有以下策略
- 如果此任務被
標誌為非阻塞任務
,那麼執行緒A會立即執行
此任務
如果任務阻塞型別未知或者被標記為阻塞狀態
-
如果執行緒池中的執行緒都處於
繁忙
狀態,則將其提交到執行緒池種等待執行 -
如果執行緒池種有空閒執行緒B,則嘗試將執行緒A負責
輪詢功
能交給執行緒B,如果立即獲取到執行緒B
成功,則執行緒A會直接執行獲取到的任務,任務執行完成後,執行緒A會嘗試奪回
交給執行緒B的輪詢任務,如果奪回失敗則變為空閒執行緒等待分配任務。(想起引子了嗎?) -
除此之外,執行緒A還會嘗試直接執行任務並且不會交出輪詢工作 (程式碼太長,只摘出關鍵程式碼)
case BLOCKING:
synchronized (this)
{
if (_pending)
{
//輪詢工作陷入了停滯,因此是IDLE狀態
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}
//tryExecute 如果立即分配到了執行緒則返回true
//this的run方法也就是實現輪詢執行緒核心的方法
//因此此行程式碼相當於將輪詢的工作轉移給了其他執行緒
else if (_tryExecutor.tryExecute(this))
{
_pending = true;
//由於輪詢工作的轉移
//因此當前輪詢工作相當於陷入空閒狀態
//所以需要將此物件的狀態至為IDLE
//(輪詢執行緒和當前執行緒使用同一個物件)
_state = State.IDLE;
mode = Mode.EXECUTE_PRODUCE_CONSUME;
}else
{
//前兩者均不滿足則將任務提交到執行緒池
mode = Mode.PRODUCE_EXECUTE_CONSUME;
}
}
break;
複製程式碼
任務的執行策略
case EXECUTE_PRODUCE_CONSUME:
_epcMode.increment();
//直接在當前執行緒呼叫
runTask(task);
// 嘗試奪回輪詢任務
synchronized (this)
{
// 如果State還處於空閒狀態
// 說明
// 執行緒B還未開始執行輪詢任務,可以直接奪回
// 如果執行緒B已經開始輪詢
// 則選擇離開
if (_state == State.IDLE)
{
// 返回true則繼續輪詢
return true;
}
}
//返回false則結束輪詢任務,變為空閒執行緒
return false;
複製程式碼
總結
相較於循規蹈矩的Tomcat,Jetty的設計更為激進,更富有冒險主義者的精神,從個人角度來說更喜歡Jetty的設計,但從業務的角度來說還是選擇Tomcat較為穩妥畢竟穩定是業務的基本需求,並且Tomcat的效能也不會太差。
以執行緒的類別來進行劃分的話,Jetty的NIO模型如下圖所示
-
Acceptor
執行緒負責接收來自客戶端的新連線,並將其封裝成一個事件提交給輪詢執行緒處理 -
輪詢執行緒
輪詢執行緒處理負責輪詢I/O事件之外,還需要處理外部執行緒所提交的selector
更新任務,並且根據設定的執行策略,輪詢執行緒可能會在本執行緒直接執行I/O任務,並將輪詢任務移交給其他空閒的執行緒,或者選擇一個空閒的執行緒來執行I/O操作 -
I/O執行緒
主要負責處理I/O操作
從執行緒類別的角度來看Jetty的NIO模型相對簡單,但其引入的輪詢執行緒執行策略使得執行緒之間身份可以發生轉變,得益於此Jetty可以直接輪詢執行緒直接執行I/O任務減少了執行緒上下文切換所帶來的效能消耗,提升了效能。
思想遷移
切換執行緒是有成本的 Jetty通過直接在輪詢執行緒執行I/O任務來提升效能,來減少執行緒上下文的切換,除此之外,我們還可以實現協程的機制來減少執行緒上下文切換所帶來的成本(參考Go語言)
Acceptor執行緒應適量 如果將ServerSocket設定為阻塞模式,那麼accept操作將導致執行緒陷入阻塞,從accept方法返回時將引起執行緒上下的切換,因此並不是越多越好
如何Debug Jetty
我們使用SpringBoot來Debug Jetty,因此我們需要在pom.xml
中引入Jetty,由於SpringBoot預設使用Tomcat因此我們需要將其替換掉,依賴如下所示.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
</dependency>
複製程式碼
使用的SpringBoot版本是2.2.0其所依賴的Jetty版本號是9.4.20
-
如果你要了解Connector是如何工作的請關注以下類
org.eclipse.jetty.server.ServerConnector
-
如果你想要了解Jetty NIO 如何輪詢以及處理事件,那麼請關注以下類
org.eclipse.jetty.io.ManagedSelector
並在其內部類SelectorProducer
的produce
方法打上斷點,如下圖所示,你將瞭解到整個輪詢過程中都發生了什麼
右鍵小紅點,選擇Thread,以避免進入不了斷點的情況,畢竟我們除錯的是多執行緒程式
- 如果你想要了解執行緒執行的策略,那麼請關注以下類(此類執行機制較為複雜,如果想Debug到所有的情況,最好結合一定的策略,如在Controller程式碼處阻塞住執行緒等)
org.eclipse.jetty.util.thread.strategy.EatWhatYouKill