Jetty NIO模型
概述
jetty NIO是典型reactor模型,如下圖所示:
即:mainReactor負責監聽server socket,接受新連線,並將建立的socket分派給subReactor。subReactor負責多路分離已連線的socket,讀寫網路資料,扔給worker執行緒池來處理。本文主要是講解jetty中mainReactor、subReactor、執行緒池的實現。
mainReactor
jetty中的server就相當於一個容器,一個jetty容器包含多個聯結器和一個執行緒池,聯結器實現了LifeCycle介面,隨容器啟動而啟動,下圖是聯結器啟動後,監聽server socket,建立連線的過程:
可見,jetty利用了執行緒池來建立連線,每一個連線任務被當成一個job被放到了job佇列裡面,負責連線的執行緒會從佇列中取出任務來執行,將得到的ServerSocket交給subReactor,下面來看subReactor的實現。
subReactor
這裡需要提一下jetty nio很重要的一個類SelectorManager,它負責channel註冊,select,wakeup等操作。在SelectorManager中有SelectSet陣列,可以把SelectSet理解為SelectorManager的代理,因為真正做事的是SelectSet,這裡面SelectSet設計為一個數組,應該也是分而治之的思想,讓一個selector監聽更少的selectionkey。
SelectSet中有一個非常重要的成員changes,changes中存放了所有有變化的channel、endpoint、attachement。分別在以下情況觸發addChannel方法:當有新的通道加入時,當有新的事件到來時,當有資料到來時。
subReactor的執行流程如下圖:
在這裡導致addChange除了selectorManager.register之外,還有endpoint.updatekey()以及selectionkey資料有變化時等等。
ThreadPool
jetty的執行緒池相當簡單,其實mainReactor與subReactor共用同一個執行緒池,執行緒池的實現類是QueuedThreadPool,當然在jetty.xml中可以設定自己的執行緒池類。簡單看下執行緒池的run方法
private Runnable _runnable = new Runnable()
{
public void run()
{
boolean shrink=false;
try
{
Runnable job=_jobs.poll();
while (isRunning())
{
// Job loop
while (job!=null && isRunning())
{
runJob(job);
job=_jobs.poll();
}
// Idle loop
try
{
_threadsIdle.incrementAndGet();
while (isRunning() && job==null)
{
if (_maxIdleTimeMs<=0)
job=_jobs.take();
else
{
// maybe we should shrink?
final int size=_threadsStarted.get();
if (size>_minThreads)
{
long last=_lastShrink.get();
long now=System.currentTimeMillis();
if (last==0 || (now-last)>_maxIdleTimeMs)
{
shrink=_lastShrink.compareAndSet(last,now) &&
_threadsStarted.compareAndSet(size,size-1);
if (shrink)
return;
}
}
job=idleJobPoll();
}
}
}
finally
{
_threadsIdle.decrementAndGet();
}
}
}
catch(InterruptedException e)
{
...
}
}
};
1、執行緒池有個最小執行緒數_minThreads=8,當執行緒池啟動時會建立_minThreads個執行緒,並啟動它們。第12行,執行緒從任務佇列中取出一個任務,並執行。這裡使用了while迴圈表示這裡會阻塞等待任務執行完,當任務佇列中沒有任務時,才會退出while迴圈;2、退出while迴圈後,這個執行緒就空閒了,在這裡需要有個回收策略,在等待_maxIdleTimeMs時間後,如果當前執行緒數大於_minThreads時,就會回收這個執行緒。
那麼執行緒數什麼時候會大於_minThreads?來看看dispatch()方法中的核心程式碼
// If we had no idle threads or the jobQ is greater than the idle threads
if (idle==0 || jobQ>idle)
{
int threads=_threadsStarted.get();
if (threads<_maxThreads)
startThread(threads);
}
如果沒有空閒的執行緒或者空閒執行緒數太少,在保證執行緒數沒有超過_maxThreads時會新建執行緒。