Netty原始碼分析之ChannelPipeline—入站事件的傳播
之前的文章中我們說過ChannelPipeline作為Netty中的資料管道,負責傳遞Channel中訊息的事件傳播,事件的傳播分為入站和出站兩個方向,分別通知ChannelInboundHandler與ChannelOutboundHandler來觸發對應事件。這篇文章我們先對Netty中入站事件的傳播,也就是ChannelInboundHandler進行下分析:
1、入站事件傳播示例
我們通過一個簡單的例子看下ChannelPipeline中入站事件channelRead的傳播
public class ServerApp { public static void main(String[] args) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup work = new NioEventLoopGroup(2); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, work).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); // p.addLast(new LoggingHandler(LogLevel.INFO)); // 向ChannelPipeline中新增自定義channelHandler p.addLast(new ServerHandlerA()); p.addLast(new ServerHandlerB()); p.addLast(new ServerHandlerC()); } }); bootstrap.bind(8050).sync(); } catch (Exception e) { // TODO: handle exception } } } public class ServerHandlerA extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object object) { System.out.println(this.getClass().getName() + "--"+object.toString()); ctx.fireChannelRead(object); } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.channel().pipeline().fireChannelRead("hello word"); } } public class ServerHandlerB extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object object) { System.out.println(this.getClass().getName() + "--"+object.toString()); ctx.fireChannelRead(object); } } public class ServerHandlerC extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object object) { System.out.println(this.getClass().getName() + "--"+object.toString()); ctx.fireChannelRead(object); } }
客戶端連線服務後可看到輸出結果
io.netty.example.echo.my.ServerHandlerA--hello word io.netty.example.echo.my.ServerHandlerB--hello word io.netty.example.echo.my.ServerHandlerC--hello word
通過輸出結果我們可以看到,訊息會根據向ChannelPipeline中新增自定義channelHandler的順序傳遞,並通過實現channelRead介面處理訊息接收事件的。在例子中channelRead事件的傳遞是通過ctx.fireChannelRead(object)方法實現,接下來我們就從這裡入手看下ChannelPipeline事件傳遞的具體實現。
2、channelRead事件的傳播
首先這裡需要注意的是我們例子中第一個節點的傳遞與實際應用中入站資料的傳遞是通過ChannelPipeline的fireChannelRead方法實現的,因為在實際的應用中,入站事件的傳遞是由NioUnsafe的read介面實現發起的,需要保證訊息是從head結點開始傳遞的,例子中是為了模擬這一過程。
ctx.channel().pipeline().fireChannelRead("hello word");
@Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg);//預設傳入head節點 return this; }
進入invokeChannelRead方法內部看下具體實現;
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { //ObjectUtil.checkNotNull 判斷傳入的訊息資料是否為空 //next.pipeline.touch 對訊息型別進行判斷 final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor();//獲取ChannelHandlerContext對應的執行緒 if (executor.inEventLoop()) {//是否為當前執行緒 next.invokeChannelRead(m);//呼叫ChannelHandlerContext中invokeChannelRead的回撥方法 } else { executor.execute(new Runnable() {//如果執行緒不是當前執行緒 @Override public void run() { next.invokeChannelRead(m); } }); } }
其中invokeChannelRead方法會獲取該ChannelHandlerContext所封裝的handler實現;
private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { //獲取封裝的ChannelInboundHandler實現,並呼叫我們實現的channelRead方法, ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } }
前面我們知道首先傳入的ChannelPipeline中ChannelHandlerContext連結串列的head頭部節點HeadContext,看下其channelRead的方法實現;
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.fireChannelRead(msg); }
呼叫當前ChannelHandlerContext的fireChannelRead方法,進入ctx.fireChannelRead(object)方法內部看下具體的原始碼實現;
@Override public ChannelHandlerContext fireChannelRead(final Object msg) { //開始訊息傳遞,findContextInbound方法按順序獲取當前ChannelHandlerContext的next節點 invokeChannelRead(findContextInbound(), msg); return this; }
findContextInbound方法獲取的是HeadContext的下一個節點,也就是我們例子中向ChannelPipeline中新增自定義ServerHandlerA;
到這裡其實就可以看出Pipeline中channelRead事件的傳播主要就是通過ctx.fireChannelRead(msg),獲取當前ChannelHandlerContext下一個節點中封裝的ChannelInboundHandler來實現的,最後一步一步傳遞到Tail尾部節點。
3、資源的釋放及SimpleChannelInboundHandler
Netty中物件的生命週期由它們的引用計數管理的,為保證入站物件資源被釋放,我們需要通過ReferenceCountUtil.release方法減少引用計數,確保物件的的最終計數器最後被置為0,從而被回收釋放。我們看下Netty在入站事件中預設是如何減少引用計數的。
第一種方法,如果我們跟上面示例一樣,在實現的每一個ChannelInboundHandler中都呼叫了ctx.fireChannelRead(msg),最後訊息會被傳遞到Tail尾節點,我們看下Tail節點中的channelRead方法
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) { onUnhandledInboundMessage(msg); } protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }
Tail節點的channelRead方法最終會呼叫ReferenceCountUtil.release方法來減少引用計數的,所以如果你在處理入站訊息的過程中沒有增加引用並且通過ctx.fireChannelRead(msg)方法把訊息傳到了Tail節點,你就不需要自己顯式呼叫ReferenceCountUtil.release方法了。
其次如果繼承的是SimpleChannelInboundHandler,可以看到SimpleChannelInboundHandler的channelRead方法實現中也已經呼叫了ReferenceCountUtil.release方法來減少引用計數;
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
所以關於入站訊息的資源釋放方式總結如下:
- 1、繼承ChannelInboundHandlerAdapter ,在channelRead的方法實現中呼叫ctx.fireChannelRead(object)方法,把訊息一直向下傳遞,直到傳遞到Tail尾部節點,由Tail節點執行 ReferenceCountUtil.release來減少計數器,保證資源釋放;
- 2、繼承SimpleChannelInboundHandler,SimpleChannelInboundHandler本身的ChannelRead方法中會執行 ReferenceCountUtil.release來減少引用;
- 3、如果以上兩點都沒有做到,那就需要手動呼叫ReferenceCountUtil.release來減少引用來釋放資源;
到這裡我們基本瞭解了ChannelPipeline中入站事件是如何傳播與相應的的,以及Netty中入站訊息的資源釋放機制。其中如有不足與不正確的地方還望指出與海涵。
關注微信公眾號,檢視更多技術文章。
&n