【1】netty4服務端啟動原始碼分析-執行緒的建立
轉自 http://xw-z1985.iteye.com/blog/1925013
本文分析Netty中boss和worker的執行緒的建立過程:
以下程式碼是服務端的啟動程式碼,執行緒的建立就發生在其中。
EventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup的類關係圖如下:
構造方法執行過程如下:
// NioEventLoopGroup public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, null); } public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { super(nThreads, threadFactory, selectorProvider); }
看下父類MultithreadEventLoopGroup的構造方法
// MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
注:如果沒有指定建立的執行緒數量,則預設建立的執行緒個數為DEFAULT_EVENT_LOOP_THREADS,該數值為:處理器數量x2
再來看MultithreadEventLoopGroup的父類MultithreadEventExecutorGroup的構造方法
/** * Create a new instance. * * @param nThreads the number of threads that will be used by this instance. * @param threadFactory the ThreadFactory to use, or {@code null} if the default should be used. * @param args arguments which will passed to each {@link #newChild(ThreadFactory, Object...)} call */ protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (threadFactory == null) { threadFactory = newDefaultThreadFactory(); } children = new SingleThreadEventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(threadFactory, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } }
變數children就是用來存放建立的執行緒的陣列,裡面每一個元素都通過children[i] = newChild(threadFactory, args)建立。而newChild方法則由子類NioEventLoopGroup實現
// NioEventLoopGroup
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
每個元素的真實型別為NioEventLoop,而NioEventLoop的類關係圖如下
(注:其實有點變態的,譬如EventLoop繼承EventLoopGroup,不知道是啥原因,僅僅是因為EventLoop裡有個方法parent(),返回EventLoopGroup這個功能嗎?後續待確認)
接著看NioEventLoop的建構函式:
// NioEventLoop
NioEven NioEventLoop tLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
首先分析一下selector = openSelector()
// NioEventLoop
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
try {
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Class<?> selectorImplClass =
Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());
selectorImplClass.isAssignableFrom(selector.getClass());
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
selectedKeys = selectedKeySet;
logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
} catch (Throwable t) {
selectedKeys = null;
logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
}
return selector;
}
這裡對sun.nio.ch.SelectorImpl中的selectedKeys和publicSelectedKeys做了優化,NioEventLoop中的變數selectedKeys的型別是SelectedSelectionKeySet,有哪些優化呢?(內部用兩個陣列儲存?初始分配陣列大小置為1024避免頻繁擴容?當大小超過1024時,對陣列進行雙倍擴容?)。
利用反射,當註冊到selector中的selectionKey已準備就緒時,selectedKeys中的元素就不會為空,後面會根據selectedKeys進行分發。
最後分析super(parent, threadFactory, false),即父類SingleThreadEventExecutor的建構函式
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param threadFactory the {@link ThreadFactory} which will be used for the used {@link Thread}
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
*/
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
if (state < ST_SHUTTING_DOWN) {
state = ST_SHUTTING_DOWN;
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
synchronized (stateLock) {
state = ST_TERMINATED;
}
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
taskQueue = newTaskQueue();
}
/**
* Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a
* {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking
* calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant
* implementation that does not support blocking operations at all.
*/
protected Queue<Runnable> newTaskQueue() {
return new LinkedBlockingQueue<Runnable>();
}
boss執行緒就在此處建立:thread = threadFactory.newThread(new Runnable()
同時也建立了執行緒的任務佇列,是一個LinkedBlockingQueue結構。
SingleThreadEventExecutor.this.run()由子類NioEventLoop實現,後面的文章再進行分析
總結:
EventLoopGroup bossGroup = new NioEventLoopGroup()發生了以下事情:
1、 為NioEventLoopGroup建立數量為:處理器個數 x 2的,型別為NioEventLoop的例項。每個NioEventLoop例項 都持有一個執行緒,以及一個型別為LinkedBlockingQueue的任務佇列
2、執行緒的執行邏輯由NioEventLoop實現
3、每個NioEventLoop例項都持有一個selector,並對selector進行優化。