Netty進階:Pilpeline原理分析
在上篇文章中提到每個SocketCahnnel或者ServerSocketChannel的父類AbstractChannel的建構函式中會例項化DefaultChannelPipeline。在本文中會詳細的介紹ChannelPiple例項化過程中的細節、以及ChannelPiple的工作原理。
1. ChannelPiple的例項化細節
首先來看看DefaultChannelPipeline類的繼承關係圖:
看到DefaultChannelPipeline實現了 ChannelInboundInvoker及ChannelOutboundInvoker兩個介面。顧名思義,一個是處理通道的inbound事件呼叫器,另一個是處理通道的outbound事件呼叫器。
- inbound: 本質上就是執行I/O執行緒將從外部read到的資料傳遞給業務執行緒的一個過程。
- outbound: 本質上就是業務執行緒將資料傳遞給I/O執行緒, 直至傳送給外部的一個過程。
再回顧一下上篇文章中已經提到過的DefaultChannelPipeline的構造方法
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null );
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在此建構函式中綁定了當前channel例項,然後初始化雙向列表的頭尾節點。其中head是HeadContext的例項,tail是TailContext的例項,HeadContext與TailContext都是DefaultChannelPipeline的內部類。下面看看它們的類的繼承結構。
HeadContext類:
- HeadContext與TailContext都是通道的handler(中文一般叫做處理器)
- HeadContext既可以用於outbound過程的handler,也可以用於inbound過程的handler
- TailContext只可以用於inbound過程的handler
- HeadContext 與 TailContext 同時也是一個處理器上下文物件
下面繼續進入到HeadContext的構造方法
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
//獲取channel中的unsafe物件
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
AbstractChannelHandlerContext的構造方法:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
此構造方法,只是設定了當前context物件對應的Pipeline以及此context是作用於outbound。最後一行程式碼setAddComplete設定當前節點的狀態,通過sun.misc.Unsafe的CAS操作來完成的。
小結:
- 每個channel初始化時,都會建立一個與之對應的pipeline
- 此pipeline內部就是一個雙向連結串列
- 雙向連結串列的頭結點是處理outbound過程的handler,尾節點是處理inbound過程的handler;
- 雙向連結串列的結點同時還是handler上下文物件;
2. 新增Handller
通過上一節 知道了ChannelPipeline 中含有兩個 ChannelHandlerContext(同時也是 ChannelHandler), 但是這個 Pipeline是如何新增我們指定的handller呢?一般在我們服務端會有如下的程式碼:
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Someonehandller());
}
如果看過ServerBootStrap原始碼分析Netty進階篇:ServerBootStrap原始碼分析就知道這裡只是設定了ServerBootStrap中的childHandller和AbstractBootStart中的handller屬性,當然handler()不是必須的。那麼這裡設定的handler和何時新增到PipleLine中呢?
AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister->init(channel)
ServerBootStrap重寫了init方法,比BootStartp實現稍微複雜一些。
@Override
void init(Channel channel) throws Exception {
//省略部分程式碼
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
//...
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
}
}
這個方法此處傳入的引數channel是NioServerSocketChannel,channel.pipeline()獲取此channel關聯的PipleLine,接下來是呼叫關聯的pipeline的addLast()方法,是new了一個ChannelInitializer的匿名物件,下面看看ChannelInitializer的繼承關係
addLast最終具體實現如下:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
newContext方法的作用就是對傳入的handler進行包裝,最後返回一個綁定了handler的context物件,也就是DefaultChannelHandlerContext例項,形成和PipeLine中head和tail相似的格式,但是前面也提到過head和tail都有in/outboud屬性,如何對這個Handler設定呢?看到原始碼後真的不得不歎服Netty開發者博大精深!
newContext方法中呼叫了下面構造方法:
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
內部呼叫了父類AbstractChannelHandlerContext構造方法,下面看看isInbound方法
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
接下里看addlast0方法:
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
看到這裡就很舒服了,這就是在尾節點前面插入一個節點的操作。
init方法就是給NioServerSocket的pipeLine中新增一個匿名ChannelInitializer類,那麼有一個疑問,該匿名類的initChannel方法什麼時候執行?也就是ServerBootstrapAcceptor什麼時候被新增到pipleLine中?
在ServerBootStarp原始碼分析中提到過,當channel註冊到selector後,會呼叫pipeline.fireChannelRegistered() 方法,如下:
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
引數head 是一個 AbstractChannelHandlerContext 例項, 並且它沒有重寫 invokeChannelRegistered方法,因此呼叫的AbstractChannelHandlerContext中實現的方法。
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
handler() 返回的, 其實就是一開始我們例項化匿名hannelInitializer 物件, 並接著呼叫了 ChannelInitializer.channelRegistered 方法. 繼續進入此方法
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
// the handler.
if (initChannel(ctx)) {
// we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
// miss an event.
ctx.pipeline().fireChannelRegistered();
} else {
// Called initChannel(...) before which is the expected behavior, so just forward the event.
ctx.fireChannelRegistered();
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
initChannel(C ch) 這個方法就是我們重寫的方法,在此處呼叫, 當添加了自定義的 ServerBootstrapAcceptor 後, 會刪除 ChannelInitializer 這個 ChannelHandler, 即 remove(ctx);, 因此最後的 Pipeline 。
3. 事件傳輸機制
I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
Outbound
(1). Outbound 事件的傳播方向是 tail -> customContext -> head. 在客戶端啟動分析中當呼叫了 Bootstrap.connect 方法時,就會觸發一個 Connect 請求事件,回顧一下呼叫過程,pipeline 的 connect 程式碼如下:
public final ChannelFuture connect(
SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, localAddress, promise);
}
(2). 當 outbound 事件(這裡是 connect 事件)傳遞到 Pipeline 後, 它其實是以 tail 為起點開始傳播的.而 tail.connect 其實呼叫的是 AbstractChannelHandlerContext.connect 方法:
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
(3). findContextOutbound() 顧名思義, 它的作用是以當前 Context 為起點, 向 Pipeline 中的 Context 雙向連結串列的前端尋找第一個 outbound 屬性為真的 Context(即關聯著 ChannelOutboundHandler 的 Context), 然後返回。找到了一個 outbound 的 Context 後, 就呼叫它的 invokeConnect 方法, 這個方法中會呼叫 Context 所關聯著的 ChannelHandler 的 connect 方法:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
(4). 如果使用者沒有重寫 ChannelHandler 的 connect 方法, 那麼會呼叫 ChannelOutboundHandlerAdapter 所實現的方法:
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
ChannelOutboundHandlerAdapter.connect 僅僅呼叫了 ctx.connect,因此又回到了(2),繼續尋找outboundhandler,直到 connect 事件傳遞到DefaultChannelPipeline 的雙向連結串列的頭節點, 即 head 中,在head節點中呼叫connect:
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
下面以一幅圖來描述一個整個 Connect 請求事件的處理過程:
和connect類似的Outbound事件還有:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
注意handler和context的的區別,比如context中connect方法是事件傳輸的介質,handler中connect是真正的處理事件。 Inbound 事件傳播方法有:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
如果我們捕獲了一個事件, 並且想讓這個事件繼續傳遞下去, 那麼需要呼叫 Context 相應的傳播方法.
總結 對於 Outbound事件:
Outbound 事件是請求事件(由 Connect 發起一個請求, 並最終由 unsafe 處理這個請求)
Outbound 事件的發起者是 Channel
Outbound 事件的處理者是 unsafe
Outbound 事件在 Pipeline 中的傳輸方向是 tail -> head.
在 ChannelHandler 中處理事件時, 如果這個 Handler 不是最後一個 Hnalder, 則需要呼叫 ctx.xxx (例如 ctx.connect) 將此事件繼續傳播下去. 如果不這樣做, 那麼此事件的傳播會提前終止.
Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT
對於 Inbound 事件:
Inbound 事件是通知事件, 當某件事情已經就緒後, 通知上層.
Inbound 事件發起者是 unsafe
Inbound 事件的處理者是 Channel, 如果使用者沒有實現自定義的處理方法, 那麼Inbound 事件預設的處理者是 TailContext, 並且其處理方法是空實現.
Inbound 事件在 Pipeline 中傳輸方向是 head -> tail
在 ChannelHandler 中處理事件時, 如果這個 Handler 不是最後一個 Hnalder, 則需要呼叫 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 將此事件繼續傳播下去. 如果不這樣做, 那麼此事件的傳播會提前終止.
Outbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT