1. 程式人生 > >netty原始碼分析之-EventLoop與執行緒模型(1)

netty原始碼分析之-EventLoop與執行緒模型(1)

執行緒模型確定來程式碼的執行方式,我們總是必須規避併發執行可能會帶來的副作用,所以理解netty所採用的併發模型的影響很重要。netty使用了被稱為事件迴圈的EventLoop來執行任務來處理在連線的生命週期內發生的事件

執行緒模型

對於EventLoop在事件迴圈中執行任務可以理解為:

while (!terminated) {
    List<Runnable> readyEvents = blockUntilEventsReady();
    for (Runnable ev: readyEvents) {
        ev.run(); 
    }
}

類的層次圖:
這裡寫圖片描述

互相之間關係描述:

對於java.util.concurrent.Executor:

void execute(Runnable command); 

從方法簽名上就能看出這個是為了支援非同步模式的。command表示一個命令。當前執行緒就是命令者角色,Executor內部的去執行Runnable的執行緒就是執行者。這裡沒有提供明確的地方供命令者去取得命令的執行結果。

ExecutorService 繼承了Executor 介面,增加了對自身生命週期管理的方法,同時提供了一個Future給命令者去獲取命令的執行結果。

ScheduledExecutorService 繼承了ExecutorService介面,增加了對定時任務的支援。

EventExecutorGroup 繼承了ScheduledExecutorService介面,對原來的ExecutorService的關閉介面提供了增強,提供了優雅的關閉介面。從介面名稱上可以看出它是對多個EventExecutor的集合,提供了對多個EventExecutor的迭代訪問介面。

EventExecutor 繼承EventExecutorGroup,可以看作A是B中的一員,但是A也能迭代訪問B中的其他成員。這個繼承關係支援了迭代訪問這個行為。自然的提供了一個parent介面,來獲取所屬的EventExecutorGroup 。

netty中執行緒相關的元件擴充套件了jdk中相應的實現以更加適合來處理netty中的場景。一個 EventLoop 將由一個永遠都不會改變的 Thread 驅動,同時任務 (Runnable 或者 Callable)可以直接提交給 EventLoop 實現,以立即執行或者排程執行。 根據配置和可用核心的不同,可能會建立多個 EventLoop 例項用以優化資源的使用,並且單個EventLoop 可能會被指派用於服務多個 Channel。

在netty4中,所有的I/O操作和事件都由已經被分配給了EventLoop的那個Thread來處理(不是來觸發,寫操作可以從外部的任意執行緒觸發),消除了在多個 ChannelHandler 中進行同步的需要(除了任何可能需要在多個 Channel 中共享的)

可以總結如下:

  1. 一個 EventLoopGroup 包含一個或者多個 EventLoop;
  2. 一個 EventLoop 在它的生命週期內只和一個 Thread 繫結;
  3. 所有由 EventLoop 處理的 I/O 事件都將在它專有的 Thread 上被處理;
  4. 一個 Channel 在它的生命週期內只註冊於一個 EventLoop;
  5. 一個 EventLoop 可能會被分配給一個或多個 Channel。

Netty執行緒模型的卓越效能取決於對於當前執行的Thread的身份的確定 ,如果當前呼叫執行緒正式支撐EventLoop的執行緒,那麼所提交的程式碼將會被(直接)執行。否則EventLoop將排程該任務以便稍後執行,並將它放入到內部佇列中,每個EventLoop都有它自己的任務佇列,當對應的EventLoop下次處理它的事件時就會執行佇列中的那些任務或事件。這也說明了任何的Thread是如何與Channel直接互動而無需在ChannelHandler中進行額外同步的。因此,永遠不要將一個長時執行的任務放入到執行佇列中,因為它將阻塞需要在同一執行緒上執行的任何其他任務,可以通過一個專門的EventExecutor(業務執行緒池)來處理長時的操作

這裡的業務執行緒池有兩種方式能夠實現:
第一種是在ChannelHandler的的回撥方法中,直接使用執行緒池,將需要執行的業務任務提交到執行緒池中。
第二種是藉助於Netty提供的向ChannelPipeline新增ChannelHandler時呼叫的類似於addLast方法來傳遞EventExecutor
預設情況下,呼叫pipeline.addLast(new MyServerHandler()),ChannelHandler中的回撥方法都是由I/O執行緒所執行;如果呼叫ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler… handlers),那麼ChannelHandler中的回撥方法就可以由引數中的group執行緒組來執行

這裡寫圖片描述

  • 非同步傳輸
    非同步傳輸使用來少量的EventLoop(以及和它們相關聯的Thread),一個Thread可以被多個Channel所共享。在建立EventLoopGroup時就直接分配了EventLoop以確保在需要時它們是可用的,EventLoopGroup負責為每個新建立的Channel分配一個EventLoop,一旦一個Channel被分配給來EventLoop,它將在它的整個生命週期中都使用這個EventLoop,正因為如此它可以使我們從ChannelHandler實現中的執行緒安全和同步問題中解脫出來。

    這裡寫圖片描述

  • 阻塞傳輸
    對於像OIO(舊的阻塞I/O)這樣的其他網路模型,Netty會對每一個Channel都分配一個EventLoop,如同之前一樣,確保來每個Channel的I/O事件都將只會被一個Thread處理,這一巧妙的一致性設計對Netty可靠性和易用性做出了巨大貢獻

    這裡寫圖片描述

原始碼分析

引導一個客戶端只需要一個 EventLoopGroup,但是一個 ServerBootstrap 則需要兩個。因為伺服器需要兩組不同的 Channel。第一組將只包含一個 ServerChannel,代表服務 器自身的已繫結到某個本地埠的正在監聽的套接字。而第二組將包含所有已建立的用來處理傳 入客戶端連線(對於每個伺服器已經接受的連線都有一個)的 Channel。

這裡寫圖片描述

對於服務端常見的啟動兩個EventLoopGroup的程式碼:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

bossGroup的作用是接受客戶端的連線,連線的資料到來之後,bossGroup不會作任何處理,而是交給workerGroup來完成處理邏輯。NioEventLoopGroup稱為非同步事件迴圈組,底層實際上就是死迴圈,不停的去偵測底層輸入輸出的事件。可以傳入第一個引數表示執行緒的數量。

EventLoopGroup介面中重要的方法:

    EventLoop next();  1

    ChannelFuture register(Channel channel);  2

    ChannelFuture register(ChannelPromise promise);  3
  1. 返回下一個EventLoop供使用
  2. 註冊一個channel(連線)到EventLoop,並返回一個非同步ChannelFuture,當channel註冊完成之後ChannelFuture將會被通知。也就是呼叫這個方法會立即返回,但是可以通過返回的ChannelFuture來非同步獲取相應的結果資料
  3. 傳入的ChannelPromise本身就繼承了ChannelFuture,因此本身就是一個非同步的物件,並且會持有一個Channel的引用,也就是可以通過呼叫本身的非同步方法來獲取註冊完成之後的資料

如果使用多執行緒來處理,執行緒數量的預設建立邏輯如下:

private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

該方法會呼叫系統執行時的方法,將作業系統的核心數(超核心技術會*2),然後乘以2作為預設的執行緒數。

對於執行緒資源的初始化會通過一個執行緒工廠來建立執行緒,但是不會設計到執行緒啟動的程式碼,只會當有任務提交到來的時候才會啟動初始化過的執行緒:

executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());

SingleThreadEventExecutor

SingleThreadEventExecutor抽象類的定義如下,SingleThreadEventLoop和NioEventLoop一些重要的類都繼承了該類:

public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {}

對於核心方法execute:

@Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

如果當前執行所提交的任務執行緒是EventLoop中的執行緒則直接新增到佇列中去執行,否則嘗試啟動EventLoop中的執行緒然後再新增到佇列中去執行,具體實現如下:

private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //啟動執行緒
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } 
    ...

對於netty執行緒的執行,所有I/O操作以及所觸發的事件操作都是由對應的EventLoop中的執行緒來執行,會在原始碼中多處看到以下類似程式碼片段,來判斷是否當前執行緒是EventLoop中的:

...
        if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    ...