死磕Netty原始碼之Reactor執行緒模型詳解(一)NioEventLoop的啟動
前言
Netty中最核心的就是Reactor執行緒,對應Netty中的程式碼就是NioEventLoop。NioEventLoop是通過NioEventLoopGroup進行維護的,所以在介紹NioEventLoop前我們先介紹一下NioEventLoopGroup
NioEventLoopGroup建立
NioEventLoopGroup在客戶端/服務端初始化時建立
EventLoopGroup bossGroup = new NioEventLoopGroup();
以下是NioEventLoopGroup繼承關係圖
頂層介面是Executor是JDK中的類可知EventLoopGroup支援執行一個非同步任務,接下來是ScheduledExecutorService看名字可知子類將支援任務的排程執行,接下來我們繼續跟進EventLoopGroup的構造方法,它最終呼叫到MultithreadEventLoopGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
children[i] = newChild(executor, args);
success = true;
}
chooser = chooserFactory.newChooser(children);
}
通過呼叫鏈不難發現這裡傳遞進來的executor為null。所以在MultithreadEventExecutorGroup構造方法中主要做了3件事
1.建立執行緒執行器ThreadPerTaskExecutor 2.建立NioEventLoop陣列 3.初始化NioEventLoop陣列 4.初始化執行緒選擇器
建立執行緒執行器
執行緒執行器通過呼叫ThreadPerTaskExecutor建構函式進行初始化
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
在呼叫建構函式時傳遞的引數ThreadFactory為DefaultThreadFactory例項
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
它主要功能是通過ThreadFactory新建執行緒並執行任務,Netty中的預設NIO執行緒都是由DefaultThreadFactory.newThread()建立
public Thread newThread(Runnable r) {
Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
Netty對執行緒進行了一層封裝及一些屬性設定,這些引數是在DefaultThreadFactory的構造方法中被初始化的
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
// ...
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}
我們重點關注一下新建執行緒的執行緒名prefix + nextId.incrementAndGet()到底是什麼?跟蹤程式碼發現prefix的規則是poolName和poolId(自增)通過’-‘連線起來的,通過呼叫鏈可以知道此處的poolName為’nioEventLoopGroup’。所以Netty新建的NIO執行緒預設名稱為nioEventLoopGroup-nioEventLoopId-自增ID,如nioEventLoopGroup-2-1
建立NioEventLoop陣列
children = new EventExecutor[nThreads];
關於nThreads如果使用者顯示指定nThreads數量那就按照使用者指定的設定,否則這個值將是CPU核數的兩倍。由於我們在建立NioEventLoopGroup時未傳遞任何引數,所以此處的nThreads為2倍的CPU核數,相關程式碼如下
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
children是EventExecutor陣列,但是這裡陣列中的每個元素其實都是NioEventLoop例項
初始化NioEventLoop陣列
children陣列的初始化是在以下程式碼中完成的
children[i] = newChild(executor, args);
我們跟進newChild()它最後呼叫的是NioEventLoopGroup的newChild方法
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
newChild方法最後呼叫了NioEventLoop的構造方法,以下是NioEventLoop繼承關係圖
NioEventLoop的構造方法程式碼如下
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
可以看到這裡打開了一個Selector,也就是說每一個NioEventLoop都與一個Selector繫結
初始化執行緒選擇器
初始化執行緒選擇器在如下程式碼中完成
chooser = chooserFactory.newChooser(children);
繼續跟進newChooser方法,程式碼如下
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
我們可以發現Netty通過判斷執行緒個數nThreads是否為2的冪次方來選擇chooser,接下來我們分析兩個chooser
private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
// 利用2的N次方法的特點,使用&比求餘更快
return children[childIndex.getAndIncrement() & children.length - 1];
}
}
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
// 使用求餘方式
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}
至此我們已經完成了NioEventLoopGroup的建立,並在NioEventLoopGroup建立過程中完成了NioEventLoop的初始化工作
NioEventLoop啟動
NioEventLoop的run方法是Reactor執行緒的主體,在第一次新增任務的時候被啟動
public void execute(Runnable task) {
// ...
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
}
// ...
}
外部執行緒在往任務佇列裡面新增任務的時候執行startThread(),程式碼如下
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
在startThread()方法中Netty會判斷Reactor執行緒有沒有被啟動,如果還未啟動則先通過CAS方式將STATE_UPDATER的值設定為ST_START然後啟動執行緒(CAS確保下次有新任務執行的時候再呼叫這個方法不會再次去啟動執行緒),接下來分析doStartThread()
private void doStartThread() {
...
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
...
SingleThreadEventExecutor.this.run();
...
}
}
}
在這裡Netty沒有使用傳統的執行緒建立方式來執行run方法,而是通過一個執行緒執行器executor來執行,其實是因為executor底層對執行緒做了一層優化,此處的executor就是上文中介紹到的ThreadPerTaskExecutor,它在每次執行execute方法的時候都會通過DefaultThreadFactory建立一個FastThreadLocalThread執行緒,而這個執行緒就是Netty中的Reactor執行緒實體
至此NioEventLoop啟動完畢,在下一篇部落格中將介紹NioEventLoop的執行過程