1. 程式人生 > >Netty-NioEventLoop

Netty-NioEventLoop

boolean 方式 ret 反射 promise != ssi body not

Netty 事件循環主邏輯在 NioEventLoop.run 中的 processSelectedKeys函數中

 1 protected void run() {
       //主循環不斷讀取IO事件和task,因為 EventLoop 也是 juc 的 ScheduledExecutorService 實現
2 for (;;) { 3 try { 4 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { 5
case SelectStrategy.CONTINUE: 6 continue; 7 case SelectStrategy.SELECT: 8 select(wakenUp.getAndSet(false)); 9 10 37 38 if (wakenUp.get()) { 39 selector.wakeup();
40 } 41 // fall through 42 default: 43 } 44 45 cancelledKeys = 0; 46 needsToSelectAgain = false;
           // IO事件占總執行時間的百分比 */
47 final int ioRatio = this.ioRatio; 48 if
(ioRatio == 100) { 49 try { 50 processSelectedKeys(); 51 } finally { 52 // Ensure we always run tasks. 53 runAllTasks(); 54 } 55 } else { 56 final long ioStartTime = System.nanoTime(); 57 try { 58 processSelectedKeys(); 59 } finally { 60 // Ensure we always run tasks. 61 final long ioTime = System.nanoTime() - ioStartTime; 62 runAllTasks(ioTime * (100 - ioRatio) / ioRatio); 63 } 64 } 65 } catch (Throwable t) { 66 handleLoopException(t); 67 } 68 // Always handle shutdown even if the loop processing threw an exception. 69 try { 70 if (isShuttingDown()) { 71 closeAll(); 72 if (confirmShutdown()) { 73 return; 74 } 75 } 76 } catch (Throwable t) { 77 handleLoopException(t); 78 } 79 } 80 }

processSelectedKeys 函數 執行時會判斷是否執行優化的版本,即判斷 SelectedSelectionKeySet 是否為空。

是否開啟優化取決於是否設置了環境變量 io.netty.noKeySetOptimization ,默認是 false 代表開啟

private static final boolean DISABLE_KEYSET_OPTIMIZATION =
            SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);

原理是通過反射的方式設置 eventLoop綁定的selector中的 selectKeys屬性 為 SelectedSelectionKeySet ,好處是不用 叠代 selector.selectedKeys()

註入時機為初始化 EventLoop 的時候

 1 private SelectorTuple openSelector() {
 2         12      //註入邏輯40 
41         Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
42             @Override
43             public Object run() {
44                 try {
45                     Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
46                     Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
47 
48                     Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField);
49                     if (cause != null) {
50                         return cause;
51                     }
52                     cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField);
53                     if (cause != null) {
54                         return cause;
55                     }
56 
57                     selectedKeysField.set(unwrappedSelector, selectedKeySet);
58                     publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
59                     return null;
60                 } catch (NoSuchFieldException e) {
61                     return e;
62                 } catch (IllegalAccessException e) {
63                     return e;
64                 }
65             }
66         });
67 
68         ........78     }

處理讀事件 主要在 processSelectedKey 中 ,分別對 讀、寫、連接事件進行了處理。

 
private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC‘ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
//分別處理每個channel的事件 processSelectedKey(k, (AbstractNioChannel) a); }
else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC‘ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1); selectAgain(); i = -1; } } } private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);           //處理了連接事件 unsafe.finishConnect(); } // Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
          
          //將要寫入的buffer flush掉
          ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
         //回調 pipeline 上所有的 ChannelInboundHandler 的 fireChannelRead 和 channelReadComplete 函數 unsafe.read(); } }
catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }

Netty-NioEventLoop