Netty原始碼分析 (一)----- NioEventLoopGroup
提到Netty首當其衝被提起的肯定是支援它承受高併發的執行緒模型,說到執行緒模型就不得不提到NioEventLoopGroup
這個執行緒池,接下來進入正題。
執行緒模型
首先來看一段Netty的使用示例
package com.wrh.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public final class SimpleServer { public static void main(String[] args) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new SimpleServerHandler()) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { } }); ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private static class SimpleServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { System.out.println("channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerAdded"); } } }
下面將分析第一、二行程式碼,看下NioEventLoopGroup類的建構函式幹了些什麼。其餘的部分將在其他博文中分析。
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup();
從程式碼中可以看到這裡使用了兩個執行緒池bossGroup
和workerGroup
,那麼為什麼需要定義兩個執行緒池呢?這就要說到Netty的執行緒模型了。
Netty的執行緒模型被稱為Reactor模型,具體如圖所示,圖上的mainReactor指的就是bossGroup
workerGroup
,負責處理已建立的客戶端通道上的資料讀寫;圖上還有一塊ThreadPool是具體的處理業務邏輯的執行緒池,一般情況下可以複用subReactor,比我的專案中就是這種用法,但官方建議處理一些較為耗時的業務時還是要使用單獨的ThreadPool。
NioEventLoopGroup建構函式
NioEventLoopGroup的建構函式的程式碼如下
public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, null); } public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { super(nThreads, threadFactory, selectorProvider); }
NioEventLoopGroup類中的建構函式最終都是呼叫的父類MultithreadEventLoopGroup如下的建構函式:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); }
從上面的建構函式可以得到 如果使用EventLoopGroup workerGroup = new NioEventLoopGroup()
來建立物件,即不指定執行緒個數,則netty給我們使用預設的執行緒個數,如果指定則用我們指定的執行緒個數。
預設執行緒個數相關的程式碼如下:
static { DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt( "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS); } }
而SystemPropertyUtil.getInt函式的功能為:得到系統屬性中指定key(這裡:key=”io.netty.eventLoopThreads”)所對應的value,如果獲取不到獲取失敗則返回預設值,這裡的預設值為:cpu的核數的2倍。
結論:如果沒有設定程式啟動引數(或者說沒有指定key=”io.netty.eventLoopThreads”的屬性值),那麼預設情況下執行緒的個數為cpu的核數乘以2。
繼續看,由於MultithreadEventLoopGroup的建構函式是呼叫的是其父類MultithreadEventExecutorGroup的建構函式,因此,看下此類的建構函式
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (threadFactory == null) { threadFactory = newDefaultThreadFactory(); } children = new SingleThreadEventExecutor[nThreads]; //根據執行緒個數是否為2的冪次方,採用不同策略初始化chooser if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } //產生nTreads個NioEventLoop物件儲存在children陣列中 for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(threadFactory, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { //如果newChild方法執行失敗,則對前面執行new成功的幾個NioEventLoop進行shutdown處理 if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } }
該建構函式幹了如下三件事:
1、產生了一個執行緒工場:threadFactory = newDefaultThreadFactory();
MultithreadEventExecutorGroup.java protected ThreadFactory newDefaultThreadFactory() { return new DefaultThreadFactory(getClass());//getClass()為:NioEventLoopGroup.class } DefaultThreadFactory.java public DefaultThreadFactory(Class<?> poolType) { this(poolType, false, Thread.NORM_PRIORITY); }
2、根據執行緒個數是否為2的冪次方,採用不同策略初始化chooser
private static boolean isPowerOfTwo(int val) { return (val & -val) == val; }
3、 產生nTreads個NioEventLoop物件儲存在children陣列中 ,執行緒都是通過呼叫newChild方法來產生的。
@Override protected EventExecutor newChild( ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); }
這裡傳給NioEventLoop建構函式的引數為:NioEventLoopGroup、DefaultThreadFactory、SelectorProvider。
NioEventLoop建構函式分析
既然上面提到來new一個NioEventLoop物件,下面我們就看下這個類以及其父類。
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); }
繼續看父類 SingleThreadEventLoop的建構函式
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { super(parent, threadFactory, addTaskWakesUp); }
又是直接呼叫來父類SingleThreadEventExecutor的建構函式,繼續看
protected SingleThreadEventExecutor( EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } this.parent = parent; this.addTaskWakesUp = addTaskWakesUp;//false thread = threadFactory.newThread(new Runnable() { @Override public void run() { boolean success = false; updateLastExecutionTime(); try { //呼叫NioEventLoop類的run方法 SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { for (;;) { int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this); if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) { break; } } // Check if confirmShutdown() was called at the end of the loop. if (success && gracefulShutdownStartTime == 0) { logger.error( "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates."); } try { // Run all remaining tasks and shutdown hooks. for (;;) { if (confirmShutdown()) { break; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn( "An event executor terminated with " + "non-empty task queue (" + taskQueue.size() + ')'); } terminationFuture.setSuccess(null); } } } } }); taskQueue = newTaskQueue(); } protected Queue<Runnable> newTaskQueue() { return new LinkedBlockingQueue<Runnable>(); }
主要幹如下兩件事:
1、利用ThreadFactory建立來一個Thread,傳入了一個Runnable物件,該Runnable重寫的run程式碼比較長,不過重點僅僅是呼叫NioEventLoop類的run方法。
2、使用LinkedBlockingQueue類初始化taskQueue 。
其中,newThread方法的程式碼如下:
DefaultThreadFactory.java
@Override public Thread newThread(Runnable r) { Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet()); try { //判斷是否是守護執行緒,並進行設定 if (t.isDaemon()) { if (!daemon) { t.setDaemon(false); } } else { if (daemon) { t.setDaemon(true); } } //設定其優先順序 if (t.getPriority() != priority) { t.setPriority(priority); } } catch (Exception ignored) { // Doesn't matter even if failed to set. } return t; } protected Thread newThread(Runnable r, String name) { return new FastThreadLocalThread(r, name); }
FastThreadLocalThread.java
public FastThreadLocalThread(Runnable target, String name) { super(target, name);// FastThreadLocalThread extends Thread }
到這裡,可以看到底層還是藉助於類似於Thread thread = new Thread(r)這種方式來建立執行緒。
關於NioEventLoop物件可以得到的點有,初始化了如下4個屬性。
1、NioEventLoopGroup (在父類SingleThreadEventExecutor中)
2、selector
3、provider
4、thread (在父類SingleThreadEventExecutor中)
總結
關於NioEventLoopGroup,總結如下
1、 如果不指定執行緒數,則執行緒數為:CPU的核數*2
2、根據執行緒個數是否為2的冪次方,採用不同策略初始化chooser
3、產生nThreads個NioEventLoop物件儲存在children陣列中。
可以理解NioEventLoop就是一個執行緒,執行緒NioEventLoop中裡面有如下幾個屬性:
1、NioEventLoopGroup (在父類SingleThreadEventExecutor中)
2、selector
3、provider
4、thread (在父類SingleThreadEventExecutor中)
更通俗點就是:NioEventLoopGroup就是一個執行緒池,NioEventLoop就是一個執行緒。NioEventLoopGroup執行緒池中有N個NioEventLoop執行緒。