Netty原始碼解析 -- ChannelPipeline機制與讀寫過程
阿新 • • 發佈:2020-11-08
本文繼續閱讀Netty原始碼,解析ChannelPipeline事件傳播原理,以及Netty讀寫過程。
**原始碼分析基於Netty 4.1**
#### ChannelPipeline
Netty中的ChannelPipeline可以理解為攔截器鏈,維護了一個ChannelHandler連結串列,ChannelHandler即具體攔截器,可以在讀寫過程中,對資料進行處理。
ChannelHandler也可以分為兩類。
**ChannelInboundHandler**,監控Channel狀態變化,如channelActive,channelRegistered,通常通過重寫ChannelOutboundHandler#channelRead方法處理讀取到的資料,如HttpObjectDecoder將讀取到的資料解析為(netty)HttpRequest。
**ChannelOutboundHandler**,攔截IO事件,如bind,connect,read,write,通常通過重寫ChannelInboundHandler#write方法處理將寫入Channel的資料。如HttpResponseEncoder,將待寫入的資料轉換為Http格式。
ChannelPipeline的預設實現類為DefaultChannelPipeline,它在ChannelHandler連結串列首尾維護了兩個特殊的ChannelHandler -- HeadContext,TailContext。
HeadContext負責將IO事件轉發給對應的UnSafe處理,例如前面文章中說到的register,bind,read等操作。
TailContext主要是一些兜底處理,如channelRead方法釋放ByteBuf的引用等。
##### 事件傳播
**ChannelOutboundInvoker**負責觸發ChannelOutboundHandler的方法,他們方法名相同,只是ChannelOutboundInvoker方法中少了ChannelHandlerContext引數。
同樣,**ChannelInboundInvoker**負責觸發ChannelInboundHandler的方法,但ChannelInboundInvoker的方法名多了fire,如ChannelInboundInvoker#fireChannelRead方法,觸發ChannelInboundHandler#channelRead。
**ChannelPipeline**和**ChannelHandlerContext**都繼承了這兩個介面。
但他們作用不同,ChannelPipeline是攔截器鏈,實際請求委託給ChannelHandlerContext處理。
ChannelHandlerContext介面(即ChannelHandler上下文)維護了連結串列的上下節點,它作為ChannelHandler方法引數, 負責與ChannelPipeline及其他 ChannelHandler互動。通過它可以動態修改Channel的屬性,給EventLoop提交任務,也可以向下一個(上一個)ChannelHandler傳播事件。
例如,在ChannelInboundHandler#channelRead處理完資料後,可以通過ChannelHandlerContext#write將資料寫到Channel。
ChannelInboundHandler#handler方法返回真正的ChannelHandler,並使用該ChannelHandler執行實際操作。
通過DefaultChannelPipeline#addFirst等方法新增ChannelHandler時,Netty會為ChannelHandler構造一個DefaultChannelHandlerContext,handler方法返回對應的ChannelHandler。
HeadContext,TailContext也實現了AbstractChannelHandlerContext,handler方法返回自身this。
我們也可以通過ChannelHandlerContext給EventLoop提交非同步任務
```
ctx.channel().eventLoop().execute(new Runnable() {
public void run() {
...
}
});
```
對於阻塞時間較長的操作,使用非同步任務完成是不錯的選擇。
下面以DefaultChannelPipeline#fireChannelRead為例,看一下他們的事件傳播過程。
DefaultChannelPipeline
```
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
```
使用HeadContext作為開始節點,呼叫AbstractChannelHandlerContext#invokeChannelRead方法開始呼叫攔截器連結串列。
AbstractChannelHandlerContext
```
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
...
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// #1
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
```
`#1`
handler方法獲取AbstractChannelHandlerContext真正的Handler,再觸發其ChannelPipeline#channelRead方法
由於invokeChannelRead方法在HeadContext中執行,`handler()`這裡返回HeadContext,這時會觸發HeadContext#channelRead
HeadContext#channelRead
```
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
```
HeadContext方法呼叫`ctx.fireChannelRead(msg)`,就是向下一個ChannelInboundHandler傳播事件。
AbstractChannelHandlerContext#fireChannelRead
```
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
```
`AbstractChannelHandlerContext#fireChannelRead(final Object msg)`方法主要負責找到下一個ChannelInboundHandler,並觸發其channelRead方法。
從DefaultChannelPipeline#fireChannelRead方法可以看到一個完整的呼叫鏈路:
`#1` DefaultChannelPipeline通過HeadContext開始呼叫
`#2` ChannelInboundHandler處理完當前邏輯後,呼叫`ctx.fireChannelRead(msg)`向後傳播事件
`#4` AbstractChannelHandlerContext找到下一個ChannelInboundHandler,並觸發其channelRead,從而保證攔截器鏈繼續執行。
注意:對於ChannelOutboundHandler中的方法,DefaultChannelPipeline從TailContext開始呼叫,並向前傳播事件,與ChannelInboundHandler方向相反。
大家在閱讀Netty原始碼時,對於DefaultChannelPipeline的方法,要注意該方法底層呼叫是ChannelInboundHandler還是ChannelOutboundHandler的方法,以及他們的傳播方向。
如果我們定義一個Http回聲程式,示意程式碼如下
```
new ServerBootstrap().group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelIni