1. 程式人生 > >netty原始碼解解析(4.0)-6 執行緒模型-IO執行緒EventLoopGroup和NIO實現(一)

netty原始碼解解析(4.0)-6 執行緒模型-IO執行緒EventLoopGroup和NIO實現(一)

介面定義 io.netty.channel.EventLoopGroup extends EventExecutorGroup
方法 說明
ChannelFuture register(Channel channel) 把一個channel註冊到一個EventLoop
ChannelFuture register(Channel channel, ChannelPromise promise);
同上
io.netty.channel.EventLoop extends OrderedEventExecutor, EventLoopGroup
方法 說明
EventLoopGroup parent() 得到建立這個eventLoop的EventLoopGroup
EventLoopGroup定義的主要方法是register, 這個方法的語義是把channel和eventLoop繫結在一起。一個channel對應一個eventLoop, 一個eventLoop會持有多個channel。 I/O執行緒EventLoopGroup的抽象實現 io.netty.channel.MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup io.netty.channel.SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop 兩個類主功能都是實現了EventLoopGroup定義的register方法 MultithreadEventLoopGroup public ChannelFuture register(Channel channel) { return next().register(channel); } public ChannelFuture register(Channel channel, ChannelPromise promise) { return next().register(channel, promise); } SingleThreadEventLoop public ChannelFuture register(Channel channel) { return register(channel, new DefaultChannelPromise(channel, this)); } public ChannelFuture register(final Channel channel, final ChannelPromise promise) { channel.unsafe().register(this, promise); return promise; } register的實現主要是為了呼叫Channel.Unsafe例項的register方法。 NIO實現
io.netty.channel.nio.NioEventLoopGroup extends MultithreadEventLoopGroup io.netty.channel.nio.NioEventLoop extends SingleThreadEventLoop NioEventLoopGroup是在MultithreadEventLoopGroup基礎上實現了對JDK NIO Selector的封裝, 它實現以下幾個功能:
  • 建立selector
  • 在selector上註冊channel感興趣的NIO事件
  • 實現EventExecutor的run方法,定義NIO事件和Executor任務的處理流程。
  • 把NIO事件轉換成對channel unsafe的呼叫或NioTask的呼叫
  • 控制執行緒執行I/O操作和排隊任務的用時比例
  • 處理epoll selector cpu 100%的bug
下面來具體分析這幾個功能的實現。 建立Selector NioEventLoop#openSelector()實現了建立selector的功能,預設情況下,使用SelectorProvider#openSelector()方法建立一個新個selector: final Selector unwrappedSelector = provider.openSelector(); 如果設定環境變數io.netty.noKeySetOptimization=true, 會建立一個selectedKeySet = new SelectedSelectionKeySet(), 然後使用java的反射機制把selector的selectedKeys和publicSelectedKeys替換成selectedKeySet,具體步驟是: 1.得到selector的真正型別: sun.nio.ch.SelectorImpl Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( " sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; 2.替換selector是屬性unwrappedSelector Field selectedKeysField = selectorImplClass.getDeclaredField(" selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField(" publicSelectedKeys"); selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); 之所以會設計一個這樣的優化選項,是因為一般情況下呼叫完selector的select或selectNow方法後需要呼叫Selector#selectedKeys()得到觸發NIO事件的的SelectableChannel,這樣優化之後,可以直接從selectedKeySet中得到已經觸發了NIO事件的SelectableChannel。 在selector上註冊channel感興趣的NIO事件 NioEventLoop提供了unwrappedSelector方法,這個方法返回了它建立好的Selector例項。這樣任何的外部類都可以把任意的SelectableChannel註冊到這selector上。在AbstractNioChannel中, doRegister方法的實現就是使用了這個方法: selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 另外,它還提供了一個register方法: public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) 這個方法會把task當成SelectableChannel的附件註冊到selector上: ch.register(selector, interestOps, task); 實現EventExecutor的run方法,定義NIO事件和Executor任務的處理流程 在NioEventLoop的run方法中實現NIO事件和EventExecutor的任務處理邏輯,這個run方法在io.netty.util.concurrent.SingleThreadEventExecutor中定義。在上一章中,我們看到了DefaultEventExecutor中是如何實現這個run方法的,這裡我們將要看到這run方法的另一個實現。和SingleThreadEventExecutor中的run方法相比,NioEventLoop的run方法不僅要及時地執行taskQueue中的任務,還要能及時地處理NIO事件,因此它會同時檢查selector中的NIO事件和和taskQueue佇列,任何一箇中有事件需要處理或有任務需要執行,它不會阻塞執行緒。同時它也保證了在沒有NIO事件和任務的情況下執行緒不會無謂的空轉浪費CUP資源。 run主要實現如下,為了更清晰的說明它的主要功能,我對原來的程式碼進行了一些刪減。 for(;;){ try{ / /phase1: 同時檢查NIO事件和任務 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); // 在taskQueue中沒有任務的時候執行select } // phase2: 進入處理NIO事件,執行executor任務 try{ // 處理NIO事件 processSelectedKeys(); }finally{ // 處理taskQueu中的任務 runAllTasks(); } }catch(Throwable t){ handleLoopException(t); } } run方法有兩個階段構成: phase1: 檢查NIO事件或executor任務,如果有任何的NIO事件或executor任務進入phase2。 這樣階段的主要工作在selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())和select中完成。 selectStrategy.calculateStrategy實現 selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()) 這行程式碼的含義是: 如果hasTasks() == true, 呼叫以下selector#selectNow, 然後進入phase2。 否則呼叫select。這裡使用了strategy模式,預設的strategy實現是io.netty.channe.DefaultSelectStrategy implements SelectStrategy @Override public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; } DefaultSelectStrategy實現了SelectStrategy介面,這介面定義了兩個常量: int SELECT = -1; int CONTINUE = -2; 執行時selectSuppler引數傳入的是selectNowSupplier, 它的實現如下: private final IntSupplier selectNowSupplier = new IntSupplier() { @Override public int get() throws Exception { return selectNow(); } }; 這裡的get方法呼叫了selectNow, selectNow呼叫的是Selector#selectNew方法,這個方法的返回值是>=0。 hashTasks的傳入的引數是hasTask()的返回值: return !taskQueue.isEmpty(); 程式碼讀到這裡就會發現,使用預設的的SelectStrategy實現,calculateStrategy在hasTasks()==true時返回值>=0, hasTasks() == false時返回值是SelectStrategy.SELECT,不會返回SelectStrategy.CONTINUE。 select實現 select的執行邏輯是: 1. 計算超select方法的結束時間selectDeadLineNanos long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); 2. 進入迴圈,檢查超時--超時跳出迴圈。 long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } 3. 如果在select執行過程中有executor任務提交或可以當前的wakeUp由false變成true, 跳出迴圈 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } 4. 呼叫selector#select等待NIO事件。 int selectedKeys = selector.select(timeoutMillis); selectCnt ++; 5. 如果滿足這些條件的任何一個,跳出迴圈: 有NIO事件、wakeUp的新舊值都是true、taskQueue中有任務、有定時任務到期。 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } 6. 如果執行緒被中斷,跳出迴圈。 if (Thread.interrupted()) { break; } 7. 如果selector.select超時,沒有檢查到任何NIO事件, 會在下次迴圈開始時跳出迴圈。 如果每次超時,跳到第2步繼續下一次迴圈。 long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } currentTimeNanos = time; select 最遲會在當前時間>= selectDeadLineNanos時返回,這個時間是最近一個到期的定時任務執行的時間,換言之如果沒有任何的NIO事件或executor任務,select會在定時任務到期時返回。如果沒有定時任務,delayNanos(currentTimeNanos)返回的值是 TimeUnit.SECONDS.toNanos(1),即1秒。 select會在檢查到任何NIO事件或executor任務時返回,為了保證這點,在selector.select(timeoutMillis)前後都會呼叫hasTasks檢查executor任務,為了能在呼叫executet提交任務時喚醒selector.select,NioEventLoop覆蓋了SingleThreadEventExecutor的wake方法: protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.compareAndSet(false, true)) { selector.wakeup(); } } 這個方法會及時的喚醒selector.select, 保證新提交的任務可以得到及時的執行。 phase2: 進入處理NIO事件,執行executor任務 這個階段是先呼叫processSelectedKeys()處理NIO事件,然後掉用 runAllTasks()處理所有已經到期的定時任務和已經在排隊的任務。這個階段還實現了NIO事件和executor任務的用時比例管理,這個特性稍後會詳細分析。