Netty 4.x學習筆記
1、前言
前面兩篇學習筆記已經說完了ByteBuf和Channel和Pipeline,這篇開始講講前面欠的債——執行緒模型(EventLoop和EventExecutor)。
2、Netty執行緒模型
將具體程式碼實現前,先來談談Netty的執行緒模型。正如許多部落格所提到的,Netty採用了Reactor模式,但是許多部落格也只是提到了而已,同時大家也不會忘記附上幾張Doug Lee大神的圖,但是並不會深入的解釋。為了更好的學習和理解Netty的執行緒模型,我在這裡稍微詳細的說一下我對它的理解。
Reactor模式有多個變種,Netty基於Multiple Reactors模式(如下圖)做了一定的修改,Mutilple Reactors模式有多個reactor:mainReactor和subReactor,其中mainReactor負責客戶端的連線請求,並將請求轉交給subReactor,後由subReactor負責相應通道的IO請求,非IO請求(具體邏輯處理)的任務則會直接寫入佇列,等待worker threads進行處理。
Netty的執行緒模型基於Multiple Reactors模式,借用了mainReactor和subReactor的結構,但是從程式碼裡看來,它並沒有Thread Pool這個東東。Netty的subReactor與worker thread是同一個執行緒,採用IO多路複用機制,可以使一個subReactor監聽並處理多個channel的IO請求,我給稱之為:「Single Thread with many Channel」。我根據程式碼整理出下面這種Netty執行緒模型圖:
上圖中的parentGroup和childGroup是Bootstrap構造方法中傳入的兩個物件,這兩個group均是執行緒池,childGroup執行緒池會被各個subReactor充分利用,parentGroup執行緒池則只是在bind某個埠後,獲得其中一個執行緒作為mainReactor。上圖我將subReactor和worker thread合併成了一個個的loop,具體的請求操作均在loop中完成,下文會對loop有個稍微詳細的解釋。
以上均是Nio情況下。Oio採用的是Thread per Channel機制,即每個連線均建立一個執行緒負責該連線的所有事宜。
Doug Lee大神的Reactor介紹:Scalable IO in Java
3、EventLoop和EventExecutor實現
EventLoop和EventExecutor實現共有4個主要邏輯介面,EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup,內部實現、繼承的邏輯表示無法直視,有種擦邊球的感覺。具體的類圖如下:
3.1 EventLoopGroup:
主要方法是newChild,我理解為EventLoop的工廠類。**EventLoopGroup.newChild
**EventLoop
物件。OioEventLoopGroup除外,它沒有實現newChild方法,呼叫父類的並建立ThreadPerChannelEventLoop物件。
3.2 EventLoop:
主要方法是run(),是整個Netty執行過程的邏輯程式碼實現,後面細說。
3.3 EventExecutorGroup:
執行緒池實現,主要成員是children陣列,主要方法是next(),獲得執行緒池中的一個執行緒,由子類呼叫。由於Oio採用的是Thread per Channel機制,所以沒有實現前面兩個。
3.4 EventExecutor:
Task的執行類,主要成員是taskQueue以及真正的執行執行緒物件executor,主要方法是taskQueue操作方法execute、takeTask、addTask等,以及doStartThread方法,後面細說。
4、NioEventLoopGroup實現
這裡以常用的NioEventLoopGroup為例。NioEventLoopGroup在Bootstrap初始化時作為引數傳入構造方法,由於NioEventLoopGroup涉及的程式碼較多,就不大篇幅的貼程式碼了,只寫流程性的文字或相應類和方法:
4.1 mainReactor:
1. Bootstrap.bind(port)
2. Bootstrap.initAndRegister()
2.1 Boostrap.init()
初始化Channel,配置Channel引數,以及Pipeline。其中初始化Pipeline中,需要插入ServerBootstrapAcceptor物件用作acceptor接收客戶端連線請求,acceptor也是一種ChannelInboundHandlerAdapter。
p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions, currentChildAttrs)); } }); |
呼叫channel的unsafe物件註冊selector,具體實現類為AbstractChannel$AbstractUnsafe.register。如下:
public final void register(final ChannelPromise promise) { if (eventLoop.inEventLoop()) { // 是否在Channel的loop中 register0(promise); } else { // 不在 try { eventLoop.execute(new Runnable() { // EventLoop執行一個任務 @Override public void run() { register0(promise); } }); } catch (Throwable t) { // ... } } } |
eventLoop.execute(runnable);是比較重要的一個方法。在沒有啟動真正執行緒時,它會啟動執行緒並將待執行任務放入執行佇列裡面。啟動真正執行緒(startThread())會判斷是否該執行緒已經啟動,如果已經啟動則會直接跳過,達到執行緒複用的目的。啟動的執行緒,主要呼叫方法是NioEventLoop的run()方法,run()方法在下面有詳細介紹:
public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); // 啟動執行緒 addTask(task); // 新增任務佇列 // ... } if (!addTaskWakesUp) { wakeup(inEventLoop); } } |
2.2 group().register(channel)
將 channel 註冊到下一個 EventLoop 中。
3. 接收連線請求
由NioEventLoop.run()接收到請求:
3.1 AbstractNioMessageChannel$NioMessageUnsafe.read()
3.2 NioServerSocketChannel.doReadMessages()
獲得childEventLoopGroup中的EventLoop,並依據該loop建立新的SocketChannel物件。
3.3 pipeline.fireChannelRead(readBuf.get(i));
readBuf.get(i)就是3.2中建立的SocketChannel物件。在2.2初始化Bootstrap的時候,已經將acceptor處理器插入pipeline中,所以理所當然,這個SocketChannel物件由acceptor處理器處理。
3.4 ServerBootstrapAcceptor$ServerBootstrapAcceptor.channelRead();
該方法流程與2.2、2.3類似,初始化子channel,並註冊到相應的selector。註冊的時候,也會呼叫eventLoop.execute用以執行註冊任務,execute時,啟動子執行緒。即啟動了subReactor。
4.2 subReactor:
subReactor的流程較為簡單,主體完全依賴於loop,用以執行read、write還有自定義的NioTask操作,就不深入了,直接跳過解釋loop過程。
loop:
loop是我自己提出來的元件,僅是代表subReactor的主要執行邏輯。例子可以參考NioEventLoop.run()。
loop會不斷迴圈一個過程:select -> processSelectedKeys(IO操作) -> runAllTasks(非IO操作),如下程式碼:
protected void run() { for (;;) { // ... try { if (hasTasks()) { // 如果佇列中仍有任務 selectNow(); } else { select(); // ... } // ... final long ioStartTime = System.nanoTime(); // 用以控制IO任務與非IO任務的執行時間比 needsToSelectAgain = false; // IO任務 if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } final long ioTime = System.nanoTime() - ioStartTime; final int ioRatio = this.ioRatio; // 非IO任務 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { // ... } } } |
就目前而言,基本上IO任務都會走processSelectedKeysOptimized方法,該方法即代表使用了優化的SelectedKeys。除非採用了比較特殊的JDK實現,基本都會走該方法。
- selectedKeys在openSelector()方法中初始化,Netty通過反射修改了Selector的selectedKeys成員和publicSelectedKeys成員。替換成了自己的實現——SelectedSelectionKeySet。
- 從OpenJDK 6/7的SelectorImpl中可以看到,selectedKeys和publicSeletedKeys均採用了HashSet實現。HashSet採用HashMap實現,插入需要計算Hash並解決Hash衝突並掛鏈,而SelectedSelectionKeySet實現使用了雙陣列,每次插入尾部,擴充套件策略為double,呼叫flip()則返回當前陣列並切換到另外一個數據。
- ByteBuf中去掉了flip,在這裡是否也可以呢?
processSelectedKeysOptimized主要流程如下:
final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } |
在獲得attachment後,判斷是Channel呢還是其他,其他則是NioTask。找遍程式碼並沒有發現Netty有註冊NioTask的行為,同時也沒發現NioTask的實現類。只有在NioEventLoop.register方法中有註冊NioTask至selector的行為,便判斷該行為是由使用者呼叫,可以針對某個Channel註冊自己的NioTask。這裡就只講第一個processSelectdKey(k, (AbstractNioChannel) a),但程式碼就不貼了。
和常規的NIO程式碼類似,processSelectdKey是判斷SeletedKeys的readyOps,並做出相應的操作。操作均是unsafe做的。如read可以參考:AbstractNioByteChannel$NioByteUnsafe.read()。IO操作的流程大致都是:
- 獲得資料
- 呼叫pipeline的方法,
fireChannel***
- 插入任務佇列
執行完所有IO操作後,開始執行非IO任務(runAllTasks)。Netty會控制IO和非IO任務的比例,ioTime * (100 - ioRatio) / ioRatio,預設ioRatio為50。runAllTasks乃是父類SingleThreadExecutor的方法。方法主體很簡單,將任務從TaskQueue拎出來,直接呼叫任務的run方法即可。
程式碼呼叫的是task.run(),而不是task.start()。即是單執行緒執行所有任務
protected boolean runAllTasks(long timeoutNanos) { fetchFromDelayedQueue(); Runnable task = pollTask(); if (task == null) { return false; } // 控制時間 final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0; long lastExecutionTime; for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } runTasks ++; // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.lastExecutionTime = lastExecutionTime; return true; } |
5、總結
以上內容從設計和程式碼層面總結Netty執行緒模型的大致內容,中間有很多我的不成熟的思考與理解,請朋友輕拍與指正。
看原始碼過程中是比較折磨人的。首先得了解你學習東西的業務價值是哪裡?即你學了這個之後能用在哪裡,只是不考慮場景僅僅為了看程式碼而看程式碼比較難以深入理解其內涵;其次,看程式碼一定一定得從邏輯、結構層面看,從細節層面看只會越陷越深,有種一葉障目不見泰山的感覺;最後,最好是能夠將程式碼邏輯、結構畫出來,或者整理出思維導圖啥的,可以用以理清思路。前面兩篇文章思維道路較為清晰,執行緒模型的導圖有一些但是比較混亂,就不貼出來了,用作自己參考,有興趣的可以找我要噢。
http://hongweiyi.com/2014/01/netty-4-x-thread-model/