Netty 原始碼閱讀 —— 服務端建立
之前專案中用過netty,這次趁著面試空閒時間,重新梳理一遍netty原始碼,從服務端建立開始,話不多說,直接上程式碼
先看看netty服務端建立的整體程式碼,大概如下所示:
public void bind(int port) throws Exception { EventLoopGroup workGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap();b.group(bossGroup,workGroup).channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,24) .childHandler(new ChildChannelHandler()); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully();} } private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { System.out.println("server initChannel.."); socketChannel.pipeline().addLast(new ServerHandler()); } }
ok , 我們先來看看new NioEventLoopGroup() 的過程中發生了什麼:
public NioEventLoopGroup() { this(0); } public NioEventLoopGroup(int nThreads) { this(nThreads, (ThreadFactory)null);} public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(nThreads, threadFactory, new Object[]{selectorProvider}); }可以看到首先通過層層呼叫後 最終呼叫了這一個方法,其實最後是呼叫了父類的構造方法,傳入了三個引數,第一個是執行緒數,第二個 threadFactory 為空,第三個是一個SelectorProvider,顧名思義其實就是一個Selector提供類,我們可以看一下這個類的原始碼,果然不出所料,如果看到這裡你還不熟悉的話,建議先去看看java NIO 的知識。
super(nThreads, threadFactory, new Object[]{selectorProvider});
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
public abstract AbstractSelector openSelector()
throws IOException;
public abstract ServerSocketChannel openServerSocketChannel()
throws IOException;
public abstract SocketChannel openSocketChannel()
throws IOException;
ok,剛剛說到通過super 關鍵字呼叫父類的構造方法,NioEventLoopGroup 父類是 MultithreadEventLoopGroup ,那我們來看看父類構造方法的實現
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2)); protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0?DEFAULT_EVENT_LOOP_THREADS:nThreads, threadFactory, args); }
發現MultithreadEventLoopGroup這個類又呼叫了它父類的構造方法,ok,層層往上找 ,找到MultithreadEventLoopGroup 的父類 MultithreadEventExecutorGroup,構造方法如下:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } if (threadFactory == null) { threadFactory = newDefaultThreadFactory(); } children = new SingleThreadEventExecutor[nThreads]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } for (int i = 0; i < nThreads; i ++) { boolean success = false; try { children[i] = newChild(threadFactory, args); success = true; } catch (Exception e) { // TODO: Think about if this is a good exception type throw new IllegalStateException("failed to create a child event loop", e); } finally { if (!success) { for (int j = 0; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break; } } } } } final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete(Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); }
}這裡是不是有點熟悉,回顧一下netty 的原理,這段程式碼就是netty reactor 模式的實現。chooser 用來隨機選擇一個child 執行緒執行。children 即為工作執行緒,型別為SingleThreadEventExecutor,我們來看看這個類的原始碼
通過這個類的方法我們可以看到,SingleThreadEventExecutor這個類就是具體的task 執行類了。是不是豁然開朗
接著我們繼續看一下客戶端的連線過程
還記得 MultithreadEventExecutorGroup 類構造的時候有一行 這個程式碼嗎?
children[i] = newChild(threadFactory, args);
我們看一下它發生了什麼
@Override protected EventExecutor newChild( ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); }
public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); }
可以看到 這行程式碼建立了一個 一個NioEventLoop , 這個物件裡面 打開了一個 selector, 客戶端的連線動作基本都是由這個類管理。我們看一下run() 方法:
protected void run() { for (;;) { boolean oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(oldWakenUp); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to // excessive CPU consumption. try { Thread.sleep(1000); } catch (InterruptedException e) { // Ignore. } } } }
這段程式碼邏輯大概是,先判斷一下當前有沒有任務需要執行,如果有任務,則執行任務下發邏輯,我們可以看一下processSelectedKeys() 這個方法
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 for (;;) { if (selectedKeys[i] == null) { break; } selectedKeys[i] = null; i++; } selectAgain(); // Need to flip the optimized selectedKeys to get the right reference to the array // and reset the index to -1 which will then set to 0 on the for loop // to start over again. // // See https://github.com/netty/netty/issues/1523 selectedKeys = this.selectedKeys.flip(); i = -1; } } }
我們看一下 processSelectedKey 這個方法做了什麼操作,如下
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; } try { int readyOps = k.readyOps(); // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
我們可以看到,當註冊物件是 SelectionKey.OP_READ 這個時,執行unsafe.read() 操作,我們看看這個操作幹了什麼事情,我們發現這是一個抽象方法,看一下NioMessageUnsafe 這個子類的實現
@Override public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); if (!config.isAutoRead() && !isReadPending()) { // ChannelConfig.setAutoRead(false) was called in the meantime removeReadOp(); return; } final int maxMessagesPerRead = config.getMaxMessagesPerRead(); final ChannelPipeline pipeline = pipeline(); boolean closed = false; Throwable exception = null; try { try { for (;;) { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } // stop reading and remove op if (!config.isAutoRead()) { break; } if (readBuf.size() >= maxMessagesPerRead) { break; } } } catch (Throwable t) { exception = t; } setReadPending(false); int size = readBuf.size(); for (int i = 0; i < size; i ++) { pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); pipeline.fireChannelReadComplete(); if (exception != null) { if (exception instanceof IOException && !(exception instanceof PortUnreachableException)) { // ServerChannel should not be closed even on IOException because it can often continue // accepting incoming connections. (e.g. too many open files) closed = !(AbstractNioMessageChannel.this instanceof ServerChannel); } pipeline.fireExceptionCaught(exception); } if (closed) { if (isOpen()) { close(voidPromise()); } } } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!config.isAutoRead() && !isReadPending()) { removeReadOp(); } } }
我們看一下 這行程式碼 int localRead = doReadMessages(readBuf);
protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = javaChannel().accept(); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }
其實就是 接收客戶端連線的邏輯。
final ChannelPipeline pipeline = pipeline(); 這一行程式碼看一下,我們發現,
@Override public ChannelPipeline pipeline() { return pipeline; }
private final ChannelPipeline pipeline;
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class); static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException(); static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException(); static { CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE); } private MessageSizeEstimator.Handle estimatorHandle; private final Channel parent; private final long hashCode = ThreadLocalRandom.current().nextLong(); private final Unsafe unsafe; private final ChannelPipeline pipeline; private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null); private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true); private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false); private final CloseFuture closeFuture = new CloseFuture(this); private volatile SocketAddress localAddress; private volatile SocketAddress remoteAddress; private volatile EventLoop eventLoop; private volatile boolean registered; /** Cache for the string representation of this channel */ private boolean strValActive; private String strVal;
我們發現了 pipeline 這個物件是final ,而且 一個Channel 封裝了 很多全域性物件,這些物件都是全域性不可更改的。看到這個物件,是不是對Netty 的設計有了更多的一些感悟呢
拿到了 pipeline 之後,執行了pipeline.fireChannelRead(readBuf.get(i)); 這一行程式碼,我們深入看一下
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { if (msg == null) { throw new NullPointerException("msg"); } final AbstractChannelHandlerContext next = findContextInbound(); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(msg); } else { executor.execute(new OneTimeTask() { @Override public void run() { next.invokeChannelRead(msg); } }); } return this; }
然後我們發現,next.invokeChannelRead(msg); 這裡回調了 channelRead 方法 , 所以你明白為什麼 netty 建立連線之後會回撥channelRead 方法了吧
private void invokeChannelRead(Object msg) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } }
如果你跟我一樣,一直對如何回撥這個邏輯比較好奇,那你可能會有額外發現,我們跟蹤一下這一行程式碼
final AbstractChannelHandlerContext next = findContextInbound();
private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; }
我們突然發現了ChannelHandlerContext 的設計,是一個雙向連結串列。也就是說netty是通過一個雙向連結串列來實現通訊過程中上下文管理的。這裡你是不是又想到了linkedlist 呢
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext { volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev; private final boolean inbound; private final boolean outbound; private final AbstractChannel channel; private final DefaultChannelPipeline pipeline; private final String name; private boolean removed; // Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. final EventExecutor executor; private ChannelFuture succeededFuture; // Lazily instantiated tasks used to trigger events to a handler with different executor. // These needs to be volatile as otherwise an other Thread may see an half initialized instance. // See the JMM for more details private volatile Runnable invokeChannelReadCompleteTask; private volatile Runnable invokeReadTask; private volatile Runnable invokeChannelWritableStateChangedTask; private volatile Runnable invokeFlushTask;
ok,回到我們的channelRead 方法,這裡執行的是ServerBootstrapAdapter 的 channelRead 方法,我們可以看到這裡完成了 新增 childHandler ,設定客戶端引數,以及註冊到多路複用器的邏輯。到這裡,整個連線過程清晰無疑。
public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }