netty原始碼分析之-EventLoop與執行緒模型(1)
執行緒模型確定來程式碼的執行方式,我們總是必須規避併發執行可能會帶來的副作用,所以理解netty所採用的併發模型的影響很重要。netty使用了被稱為事件迴圈的EventLoop來執行任務來處理在連線的生命週期內發生的事件
執行緒模型
對於EventLoop在事件迴圈中執行任務可以理解為:
while (!terminated) {
List<Runnable> readyEvents = blockUntilEventsReady();
for (Runnable ev: readyEvents) {
ev.run();
}
}
類的層次圖:
互相之間關係描述:
對於java.util.concurrent.Executor:
void execute(Runnable command);
從方法簽名上就能看出這個是為了支援非同步模式的。command表示一個命令。當前執行緒就是命令者角色,Executor內部的去執行Runnable的執行緒就是執行者。這裡沒有提供明確的地方供命令者去取得命令的執行結果。
ExecutorService 繼承了Executor 介面,增加了對自身生命週期管理的方法,同時提供了一個Future給命令者去獲取命令的執行結果。
ScheduledExecutorService 繼承了ExecutorService介面,增加了對定時任務的支援。
EventExecutorGroup 繼承了ScheduledExecutorService介面,對原來的ExecutorService的關閉介面提供了增強,提供了優雅的關閉介面。從介面名稱上可以看出它是對多個EventExecutor的集合,提供了對多個EventExecutor的迭代訪問介面。
EventExecutor 繼承EventExecutorGroup,可以看作A是B中的一員,但是A也能迭代訪問B中的其他成員。這個繼承關係支援了迭代訪問這個行為。自然的提供了一個parent介面,來獲取所屬的EventExecutorGroup 。
netty中執行緒相關的元件擴充套件了jdk中相應的實現以更加適合來處理netty中的場景。一個 EventLoop 將由一個永遠都不會改變的 Thread 驅動,同時任務 (Runnable 或者 Callable)可以直接提交給 EventLoop 實現,以立即執行或者排程執行。 根據配置和可用核心的不同,可能會建立多個 EventLoop 例項用以優化資源的使用,並且單個EventLoop 可能會被指派用於服務多個 Channel。
在netty4中,所有的I/O操作和事件都由已經被分配給了EventLoop的那個Thread來處理(不是來觸發,寫操作可以從外部的任意執行緒觸發),消除了在多個 ChannelHandler 中進行同步的需要(除了任何可能需要在多個 Channel 中共享的)
可以總結如下:
- 一個 EventLoopGroup 包含一個或者多個 EventLoop;
- 一個 EventLoop 在它的生命週期內只和一個 Thread 繫結;
- 所有由 EventLoop 處理的 I/O 事件都將在它專有的 Thread 上被處理;
- 一個 Channel 在它的生命週期內只註冊於一個 EventLoop;
- 一個 EventLoop 可能會被分配給一個或多個 Channel。
Netty執行緒模型的卓越效能取決於對於當前執行的Thread的身份的確定 ,如果當前呼叫執行緒正式支撐EventLoop的執行緒,那麼所提交的程式碼將會被(直接)執行。否則EventLoop將排程該任務以便稍後執行,並將它放入到內部佇列中,每個EventLoop都有它自己的任務佇列,當對應的EventLoop下次處理它的事件時就會執行佇列中的那些任務或事件。這也說明了任何的Thread是如何與Channel直接互動而無需在ChannelHandler中進行額外同步的。因此,永遠不要將一個長時執行的任務放入到執行佇列中,因為它將阻塞需要在同一執行緒上執行的任何其他任務,可以通過一個專門的EventExecutor(業務執行緒池)來處理長時的操作
這裡的業務執行緒池有兩種方式能夠實現:
第一種是在ChannelHandler的的回撥方法中,直接使用執行緒池,將需要執行的業務任務提交到執行緒池中。
第二種是藉助於Netty提供的向ChannelPipeline新增ChannelHandler時呼叫的類似於addLast方法來傳遞EventExecutor
預設情況下,呼叫pipeline.addLast(new MyServerHandler()),ChannelHandler中的回撥方法都是由I/O執行緒所執行;如果呼叫ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler… handlers),那麼ChannelHandler中的回撥方法就可以由引數中的group執行緒組來執行
非同步傳輸
非同步傳輸使用來少量的EventLoop(以及和它們相關聯的Thread),一個Thread可以被多個Channel所共享。在建立EventLoopGroup時就直接分配了EventLoop以確保在需要時它們是可用的,EventLoopGroup負責為每個新建立的Channel分配一個EventLoop,一旦一個Channel被分配給來EventLoop,它將在它的整個生命週期中都使用這個EventLoop,正因為如此它可以使我們從ChannelHandler實現中的執行緒安全和同步問題中解脫出來。阻塞傳輸
對於像OIO(舊的阻塞I/O)這樣的其他網路模型,Netty會對每一個Channel都分配一個EventLoop,如同之前一樣,確保來每個Channel的I/O事件都將只會被一個Thread處理,這一巧妙的一致性設計對Netty可靠性和易用性做出了巨大貢獻
原始碼分析
引導一個客戶端只需要一個 EventLoopGroup,但是一個 ServerBootstrap 則需要兩個。因為伺服器需要兩組不同的 Channel。第一組將只包含一個 ServerChannel,代表服務 器自身的已繫結到某個本地埠的正在監聽的套接字。而第二組將包含所有已建立的用來處理傳 入客戶端連線(對於每個伺服器已經接受的連線都有一個)的 Channel。
對於服務端常見的啟動兩個EventLoopGroup的程式碼:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
bossGroup的作用是接受客戶端的連線,連線的資料到來之後,bossGroup不會作任何處理,而是交給workerGroup來完成處理邏輯。NioEventLoopGroup稱為非同步事件迴圈組,底層實際上就是死迴圈,不停的去偵測底層輸入輸出的事件。可以傳入第一個引數表示執行緒的數量。
EventLoopGroup介面中重要的方法:
EventLoop next(); 1
ChannelFuture register(Channel channel); 2
ChannelFuture register(ChannelPromise promise); 3
- 返回下一個EventLoop供使用
- 註冊一個channel(連線)到EventLoop,並返回一個非同步ChannelFuture,當channel註冊完成之後ChannelFuture將會被通知。也就是呼叫這個方法會立即返回,但是可以通過返回的ChannelFuture來非同步獲取相應的結果資料
- 傳入的ChannelPromise本身就繼承了ChannelFuture,因此本身就是一個非同步的物件,並且會持有一個Channel的引用,也就是可以通過呼叫本身的非同步方法來獲取註冊完成之後的資料
如果使用多執行緒來處理,執行緒數量的預設建立邏輯如下:
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
該方法會呼叫系統執行時的方法,將作業系統的核心數(超核心技術會*2),然後乘以2作為預設的執行緒數。
對於執行緒資源的初始化會通過一個執行緒工廠來建立執行緒,但是不會設計到執行緒啟動的程式碼,只會當有任務提交到來的時候才會啟動初始化過的執行緒:
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
SingleThreadEventExecutor
SingleThreadEventExecutor抽象類的定義如下,SingleThreadEventLoop和NioEventLoop一些重要的類都繼承了該類:
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {}
對於核心方法execute:
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
如果當前執行所提交的任務執行緒是EventLoop中的執行緒則直接新增到佇列中去執行,否則嘗試啟動EventLoop中的執行緒然後再新增到佇列中去執行,具體實現如下:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//啟動執行緒
SingleThreadEventExecutor.this.run();
success = true;
}
...
對於netty執行緒的執行,所有I/O操作以及所觸發的事件操作都是由對應的EventLoop中的執行緒來執行,會在原始碼中多處看到以下類似程式碼片段,來判斷是否當前執行緒是EventLoop中的:
...
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
...