netty4原始碼閱讀與分析----服務端啟動過程
EventLoopGroup bossGroup=new NioEventLoopGroup(1); EventLoopGroup workerGroup=new NioEventLoopGroup(); try { ServerBootstrap b=new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); }; }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f=b.bind(port).sync(); f.channel().closeFuture().sync(); }finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
首先看下NioEventLoopGroup這個類,其關係圖如下:
其建構函式最終會呼叫到MultithreadEventExecutorGroup:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { children = new EventExecutor[nThreads]; for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(executor, args); success = true; } catch (Exception e) { ...... } finally { ...省略 } } chooser = chooserFactory.newChooser(children); ...省略 }
預設的執行緒數為cpu processor size*2,主要看下newChild方法:
protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }
主要是構造一個NioEventLoop例項,我們看下這個類:
一個NioEventLoop可以看成是一單執行緒,執行緒不停的從佇列中獲取任務執行,我們來看下其run方法:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
........
}
}
這是一個for迴圈,首先看下calculateStrategy: public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
如果佇列中沒有任務,則返回SELECT策略,這裡我們主要關注這個,接下來執行select方法:private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
rebuildSelector();
selector = this.selector;
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
........
}
首先selectDeadLineNanos=當前時間+1s,接著計算這個時間是否到期了,如果到了,直接selectNow,檢視感興趣的事件是否到來,這是一個非阻塞方法。如果未到期,接著看,// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
這裡是為了解決一個場景,如果此時wakeup被置為true,執行緒的task是沒有機會喚醒selector的,所以這裡需要check下這種情況,接著往下看,執行緒會在此阻塞timeoutMillis,等待感興趣的事件到來,如果有事件到來或者有任務到佇列中等等條件時,直接跳出迴圈返回。接下來的程式碼主要是為了解決nio epoll bug的問題,我們在另外一篇文章中詳細說這個問題。select方法返回後,接著往下看,ioRatio這裡我們採用預設的值50,即處理感興趣的事件和執行佇列中任務所花cpu時間各佔一半。回到我們的main執行緒中繼續看,EventLoopGroup用於管理這些EventLoop。所以上面前兩行程式碼我們可以理解為有一個執行緒在執行boss的工作,有processor_size*2個執行緒在執行worker的工作,可以看作是1個boss和N個worker在協作完成任務。接下來的部分,我們主要想知道boss和worker分別都在幹啥,為什麼要分boss和woker呢?看下ServerBootstrap,我們這裡採用的預設建構函式,所以接著看程式碼下一行,我們看下這個group方法是幹啥的:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
父類持有bossGroup,子類ServerBootstrap的childGroup就是我們的workGroup,這裡採用了builder模式,在可選引數比較多的時候,builder模式能夠大大派上用場。
接著channel方法將NioServerSocketChannel類放入到ChannelFactory中,用於後續通過反射構造NioServerSocketChannel例項。
接下來就是設定ServerBootstrap 的childHandler,這裡我們標記為ChannelHandler-ChannelInitializer-1
接著ChannelOption.SO_BACKLOG用於構造服務端套接字ServerSocket物件,標識當伺服器請求處理執行緒全滿時,用於臨時存放已完成三次握手的請求的佇列的最大長度。
ChannelOption.SO_KEEPALIVE表示是否啟用心跳保活機制。在雙方TCP套接字建立連線後(即都進入ESTABLISHED狀態)並且在兩個小時左右上層沒有任何資料傳輸的情況下,這套機制才會被啟用。
接下來我們看下bind方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();//1
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
首先看下initAndRegister,省略部分程式碼:
final ChannelFuture initAndRegister() {
Channel channel = null;
channel = channelFactory.newChannel();
init(channel);
....
ChannelFuture regFuture = config().group().register(channel);
....
}
首先第一步通過channelFactory建立NioServerSocketChannel例項,我們首先來看下這個類的關係圖:
我們來看下它的建構函式:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
其中newSocket就是呼叫jdk來建立一個ServerSocketChannel例項,接著看:public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);//感興趣的事件是accept
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
super最終會呼叫到AbstractChannel:protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
此時的parent=null,我們首先看下newUnsafe,主要是構造NioMessageUnsafe例項,它主要提供了read方法,用於讀事件,後面我們會看到。
接著看下pipeline,它是DefaultChannelPipeline類的例項,它維護了雙向一個連結串列,連結串列中元素型別為AbstractChannelHandlerContext,用於處理channelInbound和channeloutbound:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
}
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
TailContext是一個inbound處理器,HeadContext是一個outbound處理器。所以此時pipeline中所維護的channelHandlerContext連結串列如下:
回到NioServerSocketChannel的建構函式,接下來就是構造NioServerSocketChannelConfig例項,這裡我們先看下這個類的關係圖,
接著往下看,回到initAndRegister方法,接下來看init方法:
void init(Channel channel) throws Exception {
.........
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;//即workerGroup
final ChannelHandler currentChildHandler = childHandler;//即ChannelHandler-ChannelInitializer-1
.........
p.addLast(new ChannelInitializer<Channel>() {//這裡我們標記為ChannelHandler-ChannelInitializer-2
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
這裡向pipeline新添加了一個DefaultChannelHandlerContext,新增後,此時pipeline中維護的連結串列如下:上圖括號中是我加入的標記,用以區分過程中產生的匿名類例項.接下來回到initAndRegister方法中,看這行程式碼:
ChannelFuture regFuture = config().group().register(channel);
config().group()這個返回的是bossGroup,channel是之前建立的NioServerSocketChannel例項,下面看下register方法,注意這裡有個next方法,是通過chooser.next()來獲取EventExecutor(其實就是NioEventLoop例項)。還記得上面的陣列children = new EventExecutor[nThreads]吧,這裡的next方法是通過index&(children.leng-1)獲取一個NioEventLoop例項,這裡boss執行緒只有一個。接著往下看,最終會呼叫到Unsafe.register方法:public final void register(EventLoop eventLoop, final ChannelPromise promise) {
.......
AbstractChannel.this.eventLoop = eventLoop;//boss對應的NioEventLoop
if (eventLoop.inEventLoop()) {//初始時執行緒還沒有啟動
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
......
}
}
}
可以看到,其做法是main執行緒將register包裝成一個任務,然後丟給boss對應的執行緒去處理,然後main執行緒返回,這是個非同步的過程。接下來我們看下register0都幹了些啥:private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
.....
}
}
doRegister就是呼叫 ServerSocket進行註冊(jdk),就是把當前的NioServerSocketChannel例項與selector建立一定的關聯關係。接著往下看,pipeline.invokeHandlerAddedIfNeeded最終會往boss對應的執行緒中新增任務,該任務就是往pipeline中新增新的context,這個步驟也是非同步的,最終會進入到下面的呼叫:
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
initChannel這個函式會呼叫到標記"ChannelHandler-ChannelInitializer-2"的initChannel方法,在這個任務中,向boss執行緒添加了一個任務,這個任務是向pipeline中新增一個context ServerBootstrapAcceptor,它也是一個inboundHandler,然後把"ChannelHandler-ChannelInitializer-2",從pipeline中刪除,ServerBootstrapAcceptor還在任務佇列中還沒有新增到pipeline中,此時pipeline中維護的連結串列如下:
接著往下看,pipeline.fireChannelRegistered(),最終會呼叫到invokeChannelRegistered(findContextInbound()),此時pipeline中只有一個inboundHandler,那就是TailContext,會呼叫到它的channelRegistered方法,這裡什麼都沒做。接下來就死bind了主要在doBind0方法中,最終會呼叫unsafe.bind方法,
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
......
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
.....
}
dobind就是呼叫jdk提供的bind方法,接下來就是把通知active的事件作為一個任務提交給執行緒執行。然後這個任務會呼叫到HeadContext.channelActive方法:public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
注意,此時pipeline中維護的連結串列如下:
ctx.fireChannelActive會找出當前的第一個inboundHandler(從左到右),然後執行其channelActive,實際來看,這裡目前啥都沒做。繼續看下一行readIfIsAutoRead(),會呼叫到AbstractNioUnsafe.doBeginRead:
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();//此時的值為0
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);//readInterestOp就是OP_ACCEPT
}
}
其實就是註冊感興趣的事件accept等待連線到來,即開啟的通道NioServerSocket感興趣的事件是accept!
至此,netty服務端的流程算是啟動完成了,下面我們來總結一下:
1,netty啟動時主要是boss執行緒在執行一系列的操作,其初始化與註冊等都是封裝成一個個任務扔到執行緒佇列中執行。執行緒主要處理兩件事,一是處理感興趣的事件,二是執行佇列中的任務
2,netty的nio主要是將NioServerSocketChannel與selector繫結,然後向通道上註冊感興趣的事件,然後在boss執行緒中不斷輪尋是否有感興趣的事件到來。