Netty 作為伺服器端原始碼剖析
1 -ServerBootstrap繫結本地埠。
AbstractNioMessageChannel$NioMessageUnsafe(AbstractChannel$AbstractUnsafe).register(EventLoop, ChannelPromise)
提交register0任務時,是啟動NioEventLoop執行緒的地方。
NioEventLoop(SingleThreadEventExecutor).doStartThread() line: 730 NioEventLoop(SingleThreadEventExecutor).startThread() line: 724 NioEventLoop(SingleThreadEventExecutor).execute(Runnable) line: 671 AbstractNioMessageChannel$NioMessageUnsafe(AbstractChannel$AbstractUnsafe).register(EventLoop, ChannelPromise) line: 468 NioEventLoop(SingleThreadEventLoop).register(ChannelPromise) line: 56 NioEventLoop(SingleThreadEventLoop).register(Channel) line: 50 NioEventLoopGroup(MultithreadEventLoopGroup).register(Channel) line: 75 ServerBootstrap(AbstractBootstrap<B,C>).initAndRegister() line: 327 ServerBootstrap(AbstractBootstrap<B,C>).doBind(SocketAddress) line: 282 ServerBootstrap(AbstractBootstrap<B,C>).bind(SocketAddress) line: 278
在SingleThreadEventExecutor 類中,doStartThread方法啟動一個任務,該任務執行SingleThreadEventExecutor.this.run();方法。
NioEventLoop在輪詢runAllTasks()任務。NioEventLoop(SingleThreadEventExecutor).runAllTasks(long)public final class NioEventLoop extends SingleThreadEventLoop { for (;;) { /** * Poll all tasks from the task queue and run them via {@link Runnable#run()} method. This method stops running * the tasks in the task queue and returns if it ran longer than {@code timeoutNanos}. */ protected boolean runAllTasks(long timeoutNanos) { } } }
AbstractBootstrap<B,C>.doBind0(ChannelFuture, Channel, SocketAddress, ChannelPromise)
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new OneTimeTask() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
在提交OneTimeTask任務時,判斷是否與EventLoop想管理了。NioEventLoop(SingleThreadEventExecutor).execute(Runnable)
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
把任務放置在private final Queue<Runnable> taskQueue;佇列中。在NioEventLoop 中會執行該任務。
NioEventLoop執行緒是在NioEventLoop(SingleThreadEventExecutor).execute(Runnable) 方法啟動的。
NioEventLoop.run() 方法中存在for (;;) {} 迴圈來執行所有任務。
2 -NioEventLoop來處理IO事件。NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) 處理IO讀寫。
3- 當NioEventLoop處理accept事件時,ServerBootstrap$ServerBootstrapAcceptor.channelRead(ChannelHandlerContext, Object) 來處理。
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
ServerBootstrapAcceptor(
EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
}
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: " + child, t);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// stop accept new connections for 1 second to allow the channel to recover
// See https://github.com/netty/netty/issues/1328
config.setAutoRead(false);
ctx.channel().eventLoop().schedule(new OneTimeTask() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
// still let the exceptionCaught event flow through the pipeline to give the user
// a chance to do something with it
ctx.fireExceptionCaught(cause);
}
}
在ServerBootstarpAcceptor中,會把SocketChannel交給childGroup來處理。
4 -注意:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
在所有處理業務邏輯時,都是先判斷是否由當前的EventLoop相關聯在一起處理的。如果是當前EventLoop執行緒來處理的,就繼續叫給當前執行緒來處理,減少執行緒的切換和共享資料的訪問。當業務邏輯不是當前執行緒來處理時,在建立一個OneTimeTask任務,並把任務放置到任務佇列中,然後由執行緒池來處理。
所以所有的業務都是有當前執行緒來處理的。
在NioEventLoop extends SingleThreadEventExecutor 類中,所有的OneTimeTask都被加入到private final Queue<Runnable> taskQueue; 佇列中,然後迭代這個佇列。
在NioEventLoop中會同步處理NioEventLoop.processSelectedKey(SelectionKey, AbstractNioChannel) 方法。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
在processSelectedKeys會處理所有下面的事件。
/**
* Operation-set bit for read operations.
*
* <p> Suppose that a selection key's interest set contains
* <tt>OP_READ</tt> at the start of a <a
* href="Selector.html#selop">selection operation</a>. If the selector
* detects that the corresponding channel is ready for reading, has reached
* end-of-stream, has been remotely shut down for further reading, or has
* an error pending, then it will add <tt>OP_READ</tt> to the key's
* ready-operation set and add the key to its selected-key set. </p>
*/
public static final int OP_READ = 1 << 0;
/**
* Operation-set bit for write operations.
*
* <p> Suppose that a selection key's interest set contains
* <tt>OP_WRITE</tt> at the start of a <a
* href="Selector.html#selop">selection operation</a>. If the selector
* detects that the corresponding channel is ready for writing, has been
* remotely shut down for further writing, or has an error pending, then it
* will add <tt>OP_WRITE</tt> to the key's ready set and add the key to its
* selected-key set. </p>
*/
public static final int OP_WRITE = 1 << 2;
/**
* Operation-set bit for socket-connect operations.
*
* <p> Suppose that a selection key's interest set contains
* <tt>OP_CONNECT</tt> at the start of a <a
* href="Selector.html#selop">selection operation</a>. If the selector
* detects that the corresponding socket channel is ready to complete its
* connection sequence, or has an error pending, then it will add
* <tt>OP_CONNECT</tt> to the key's ready set and add the key to its
* selected-key set. </p>
*/
public static final int OP_CONNECT = 1 << 3;
/**
* Operation-set bit for socket-accept operations.
*
* <p> Suppose that a selection key's interest set contains
* <tt>OP_ACCEPT</tt> at the start of a <a
* href="Selector.html#selop">selection operation</a>. If the selector
* detects that the corresponding server-socket channel is ready to accept
* another connection, or has an error pending, then it will add
* <tt>OP_ACCEPT</tt> to the key's ready set and add the key to its
* selected-key set. </p>
*/
public static final int OP_ACCEPT = 1 << 4;
其中讀寫事件由NioMessageUnsafe方法處理:
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
所有的事件又由AbstractChannelHandlerContext 靜態方法來呼叫。在AbstractChannelHandlerContext又是通過下面提交OneTimeTask把任務提交到執行緒中。讓執行緒池來處理。
AbstractChannelHandlerContext{
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
}
在次看來,Mina和Netty的執行緒模型是一模一樣滴。不過Netty是在mina的基礎上繼承和發展過來的,又經過眾多大公司實踐優化過,必定青出於藍而勝於藍。