netty中Pipeline的ChannelHandler執行順序案例詳解
一、netty的Pipeline模型
netty的Pipeline模型用的是責任鏈設計模式,當boss執行緒監控到繫結埠上有accept事件,此時會為該socket連線例項化Pipeline,並將InboundHandler和OutboundHandler按序載入到Pipeline中,然後將該socket連線(也就是Channel物件)掛載到selector上。一個selector對應一個執行緒,該執行緒會輪詢所有掛載在他身上的socket連線有沒有read或write事件,然後通過執行緒池去執行Pipeline的業務流。selector如何查詢哪些socket連線有read或write事件,主要取決於呼叫作業系統的哪種IO多路複用核心,如果是select(注意,此處的select是指作業系統核心的select IO多路複用,不是netty的seletor物件),那麼將會遍歷所有socket連線,依次詢問是否有read或write事件,最終作業系統核心將所有IO事件的socket連線返回給netty程序,當有很多socket連線時,這種方式將會大大降低效能,因為存在大量socket連線的遍歷和核心記憶體的拷貝。如果是epoll,效能將會大幅提升,因為他基於完成埠事件,已經維護好有IO事件的socket連線列表,selector直接取走,無需遍歷,也少掉核心記憶體拷貝帶來的效能損耗。
Pipeline的責任鏈是通過ChannelHandlerContext物件串聯的,ChannelHandlerContext物件裡封裝了ChannelHandler物件,通過prev和next節點實現雙向連結串列。Pipeline的首尾節點分別是head和tail,當selector輪詢到socket有read事件時,將會觸發Pipeline責任鏈,從head開始調起第一個InboundHandler的ChannelRead事件,接著通過fire方法依次觸發Pipeline上的下一個ChannelHandler,如下圖:
ChannelHandler分為InbounHandler和OutboundHandler,InboundHandler用來處理接收訊息,OutboundHandler用來處理髮送訊息。head的ChannelHandler既是InboundHandler又是OutboundHandler,無論是read還是write都會經過head,所以head封裝了unsafe方法,用來操作socket的read和write。tail的ChannelHandler只是InboundHandler,read的Pipleline處理將會最終到達tail。
二、通過六組實驗驗證InboundHandler和OutboundHandler的執行順序
在做實驗之前,先把實驗程式碼貼出來。
EchoServer類:
1 package com.wisdlab.nettylab; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 12 /** 13 * @ClassName EchoServer 14 * @Description TODO 15 * @Author felix 16 * @Date 2019/9/26 10:37 17 * @Version 1.0 18 **/ 19 public class EchoServer { 20 private int port; 21 22 public EchoServer(int port) { 23 this.port = port; 24 } 25 26 private void run() { 27 EventLoopGroup bossGroup = new NioEventLoopGroup(); 28 EventLoopGroup workGroup = new NioEventLoopGroup(); 29 30 try { 31 ServerBootstrap serverBootstrap = new ServerBootstrap(); 32 serverBootstrap.group(bossGroup, workGroup) 33 .channel(NioServerSocketChannel.class) 34 .childHandler(new ChannelInitializer<SocketChannel>() { 35 @Override 36 protected void initChannel(SocketChannel socketChannel) throws Exception { 37 //outboundhandler一定要放在最後一個inboundhandler之前 38 //否則outboundhandler將不會執行到 39 socketChannel.pipeline().addLast(new EchoOutboundHandler3()); 40 socketChannel.pipeline().addLast(new EchoOutboundHandler2()); 41 socketChannel.pipeline().addLast(new EchoOutboundHandler1()); 42 43 socketChannel.pipeline().addLast(new EchoInboundHandler1()); 44 socketChannel.pipeline().addLast(new EchoInboundHandler2()); 45 socketChannel.pipeline().addLast(new EchoInboundHandler3()); 46 } 47 }) 48 .option(ChannelOption.SO_BACKLOG, 10000) 49 .childOption(ChannelOption.SO_KEEPALIVE, true); 50 System.out.println("EchoServer正在啟動."); 51 52 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); 53 System.out.println("EchoServer繫結埠" + port); 54 55 channelFuture.channel().closeFuture().sync(); 56 System.out.println("EchoServer已關閉."); 57 } catch (Exception e) { 58 e.printStackTrace(); 59 } finally { 60 bossGroup.shutdownGracefully(); 61 workGroup.shutdownGracefully(); 62 } 63 } 64 65 public static void main(String[] args) { 66 int port = 8080; 67 if (args != null && args.length > 0) { 68 try { 69 port = Integer.parseInt(args[0]); 70 } catch (Exception e) { 71 e.printStackTrace(); 72 } 73 } 74 75 EchoServer server = new EchoServer(port); 76 server.run(); 77 } 78 }
EchoInboundHandler1類:
1 package com.wisdlab.nettylab; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerContext; 6 import io.netty.channel.ChannelInboundHandlerAdapter; 7 import io.netty.util.CharsetUtil; 8 9 /** 10 * @ClassName EchoInboundHandler1 11 * @Description TODO 12 * @Author felix 13 * @Date 2019/9/26 11:15 14 * @Version 1.0 15 **/ 16 public class EchoInboundHandler1 extends ChannelInboundHandlerAdapter { 17 @Override 18 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 19 System.out.println("進入 EchoInboundHandler1.channelRead"); 20 21 String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8); 22 System.out.println("EchoInboundHandler1.channelRead 收到資料:" + data); 23 ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " + data, CharsetUtil.UTF_8)); 24 25 System.out.println("退出 EchoInboundHandler1 channelRead"); 26 } 27 28 @Override 29 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 30 System.out.println("[EchoInboundHandler1.channelReadComplete]"); 31 } 32 33 @Override 34 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 35 System.out.println("[EchoInboundHandler1.exceptionCaught]" + cause.toString()); 36 } 37 }View Code
EchoInboundHandler2類:
1 package com.wisdlab.nettylab; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerContext; 6 import io.netty.channel.ChannelInboundHandlerAdapter; 7 import io.netty.util.CharsetUtil; 8 9 /** 10 * @ClassName EchoInboundHandler2 11 * @Description TODO 12 * @Author felix 13 * @Date 2019/9/27 15:35 14 * @Version 1.0 15 **/ 16 public class EchoInboundHandler2 extends ChannelInboundHandlerAdapter { 17 @Override 18 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 19 System.out.println("進入 EchoInboundHandler2.channelRead"); 20 21 String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8); 22 System.out.println("EchoInboundHandler2.channelRead 接收到資料:" + data); 23 //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8)); 24 ctx.channel().writeAndFlush(Unpooled.copiedBuffer("測試一下channel().writeAndFlush", CharsetUtil.UTF_8)); 25 ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler2] " + data, CharsetUtil.UTF_8)); 26 27 System.out.println("退出 EchoInboundHandler2 channelRead"); 28 } 29 30 @Override 31 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 32 System.out.println("[EchoInboundHandler2.channelReadComplete]讀取資料完成"); 33 } 34 35 @Override 36 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 37 System.out.println("[EchoInboundHandler2.exceptionCaught]"); 38 } 39 }View Code
EchoInboundHandler3類:
1 package com.wisdlab.nettylab; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerContext; 6 import io.netty.channel.ChannelInboundHandlerAdapter; 7 import io.netty.util.CharsetUtil; 8 9 /** 10 * @ClassName EchoInboundHandler3 11 * @Description TODO 12 * @Author felix 13 * @Date 2019/10/23 13:43 14 * @Version 1.0 15 **/ 16 public class EchoInboundHandler3 extends ChannelInboundHandlerAdapter { 17 @Override 18 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 19 System.out.println("進入 EchoInboundHandler3.channelRead"); 20 21 String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8); 22 System.out.println("EchoInboundHandler3.channelRead 接收到資料:" + data); 23 //ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write] [EchoInboundHandler3] " + data, CharsetUtil.UTF_8)); 24 ctx.fireChannelRead(msg); 25 26 System.out.println("退出 EchoInboundHandler3 channelRead"); 27 } 28 29 @Override 30 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 31 System.out.println("[EchoInboundHandler3.channelReadComplete]讀取資料完成"); 32 } 33 34 @Override 35 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 36 System.out.println("[EchoInboundHandler3.exceptionCaught]"); 37 } 38 39 40 }View Code
EchoOutboundHandler1類:
1 package com.wisdlab.nettylab; 2 3 import io.netty.buffer.Unpooled; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelOutboundHandlerAdapter; 6 import io.netty.channel.ChannelPromise; 7 import io.netty.util.CharsetUtil; 8 9 /** 10 * @ClassName EchoOutboundHandler1 11 * @Description TODO 12 * @Author felix 13 * @Date 2019/9/27 15:36 14 * @Version 1.0 15 **/ 16 public class EchoOutboundHandler1 extends ChannelOutboundHandlerAdapter { 17 @Override 18 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 19 System.out.println("進入 EchoOutboundHandler1.write"); 20 21 //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write中的write]", CharsetUtil.UTF_8)); 22 ctx.channel().writeAndFlush(Unpooled.copiedBuffer("在OutboundHandler裡測試一下channel().writeAndFlush", CharsetUtil.UTF_8)); 23 ctx.write(msg); 24 25 System.out.println("退出 EchoOutboundHandler1.write"); 26 } 27 }View Code
EchoOutboundHandler2類:
1 package com.wisdlab.nettylab; 2 3 import io.netty.buffer.Unpooled; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelOutboundHandlerAdapter; 6 import io.netty.channel.ChannelPromise; 7 import io.netty.util.CharsetUtil; 8 9 /** 10 * @ClassName EchoOutboundHandler2 11 * @Description TODO 12 * @Author felix 13 * @Date 2019/9/27 15:36 14 * @Version 1.0 15 **/ 16 public class EchoOutboundHandler2 extends ChannelOutboundHandlerAdapter { 17 18 @Override 19 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 20 System.out.println("進入 EchoOutboundHandler2.write"); 21 22 //ctx.writeAndFlush(Unpooled.copiedBuffer("[第二次write中的write]", CharsetUtil.UTF_8)); 23 ctx.write(msg); 24 25 System.out.println("退出 EchoOutboundHandler2.write"); 26 } 27 }View Code
EchoOutboundHandler3類:
1 package com.wisdlab.nettylab; 2 3 import io.netty.channel.ChannelHandlerContext; 4 import io.netty.channel.ChannelOutboundHandlerAdapter; 5 import io.netty.channel.ChannelPromise; 6 7 /** 8 * @ClassName EchoOutboundHandler3 9 * @Description TODO 10 * @Author felix 11 * @Date 2019/10/23 23:23 12 * @Version 1.0 13 **/ 14 public class EchoOutboundHandler3 extends ChannelOutboundHandlerAdapter { 15 @Override 16 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 17 System.out.println("進入 EchoOutboundHandler3.write"); 18 19 ctx.write(msg); 20 21 System.out.println("退出 EchoOutboundHandler3.write"); 22 } 23 24 }View Code
實驗一:在InboundHandler中不觸發fire方法,後續的InboundHandler還能順序執行嗎?
如上圖所示,InboundHandler2沒有呼叫fire方法:
1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 2 System.out.println("進入 EchoInboundHandler1.channelRead"); 3 4 String data = ((ByteBuf)msg).toString(CharsetUtil.UTF_8); 5 System.out.println("EchoInboundHandler1.channelRead 收到資料:" + data); 6 //ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler1] " + data, CharsetUtil.UTF_8)); 7 8 System.out.println("退出 EchoInboundHandler1 channelRead"); 9 }
那麼InboundHandler中的程式碼還會被執行到嗎?看一下執行結果:
由上圖可知,InboundHandler2沒有呼叫fire事件,InboundHandler3沒有被執行。
結論:InboundHandler是通過fire事件決定是否要執行下一個InboundHandler,如果哪個InboundHandler沒有呼叫fire事件,那麼往後的Pipeline就斷掉了。
實驗二:InboundHandler和OutboundHandler的執行順序是什麼?
加入Pipeline的ChannelHandler的順序如上圖所示,那麼最後執行的順序如何呢?執行結果如下:
由上圖可知,執行順序為:
InboundHandler1 => InboundHandler2 => OutboundHandler1 => OutboundHander2 => OutboundHandler3 => InboundHandler3
所以,我們得到以下幾個結論:
1、InboundHandler是按照Pipleline的載入順序,順序執行。
2、OutboundHandler是按照Pipeline的載入順序,逆序執行。
實驗三:如果把OutboundHandler放在InboundHandler的後面,OutboundHandler會執行嗎?
執行結果如下:
由此可見,OutboundHandler沒有執行,為什麼呢?因為Pipleline是執行完所有有效的InboundHandler,再返回執行在最後一個InboundHandler之前的OutboundHandler。注意,有效的InboundHandler是指fire事件觸達到的InboundHandler,如果某個InboundHandler沒有呼叫fire事件,後面的InboundHandler都是無效的InboundHandler。為了印證這一點,我們繼續做一個實驗,我們把其中一個OutboundHandler放在最後一個有效的InboundHandler之前,看看這唯一的一個OutboundHandler是否會執行,其他OutboundHandler是否不會執行。
執行結果如下:
由此可見,只執行了OutboundHandler1,其他OutboundHandler沒有被執行。
所以,我們得到以下幾個結論:
1、有效的InboundHandler是指通過fire事件能觸達到的最後一個InboundHander。
2、如果想讓所有的OutboundHandler都能被執行到,那麼必須把OutboundHandler放在最後一個有效的InboundHandler之前。
3、推薦的做法是通過addFirst載入所有OutboundHandler,再通過addLast載入所有InboundHandler。
實驗四:如果其中一個OutboundHandler沒有執行write方法,那麼訊息會不會發送出去?
我們把OutboundHandler2的write方法注掉
1 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 2 System.out.println("進入 EchoOutboundHandler3.write"); 3 4 //ctx.write(msg); 5 6 System.out.println("退出 EchoOutboundHandler3.write"); 7 }
執行結果如下:
可以看到,OutboundHandler3並沒有被執行到,另外,客戶端也沒有收到傳送的訊息。
所以,我們得到以下幾個結論:
1、OutboundHandler是通過write方法實現Pipeline的串聯的。
2、如果OutboundHandler在Pipeline的處理鏈上,其中一個OutboundHandler沒有呼叫write方法,最終訊息將不會發送出去。
實驗五:ctx.writeAndFlush 的OutboundHandler的執行順序是什麼?
我們設定ChannelHandler在Pipeline中的載入順序如下:
OutboundHandler3 => InboundHandler1 => OutboundHandler2 => InboundHandler2 => OutboundHandler1 => InboundHandler3
在InboundHander2中呼叫ctx.writeAndFlush:
1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 2 System.out.println("進入 EchoInboundHandler2.channelRead"); 3 4 String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8); 5 System.out.println("EchoInboundHandler2.channelRead 接收到資料:" + data); 6 ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8)); 7 //ctx.channel().writeAndFlush(Unpooled.copiedBuffer("測試一下channel().writeAndFlush", CharsetUtil.UTF_8)); 8 ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler2] " + data, CharsetUtil.UTF_8)); 9 10 System.out.println("退出 EchoInboundHandler2 channelRead"); 11 }
執行結果如下:
由上圖可知,依次執行了OutboundHandler2和OutboundHandler3,為什麼會這樣呢?因為ctx.writeAndFlush是從當前的ChannelHandler開始,向前依次執行OutboundHandler的write方法,所以分別執行了OutboundHandler2和OutboundHandler3:
OutboundHandler3 => InboundHandler1 => OutboundHandler2 => InboundHandler2 => OutboundHandler1 => InboundHandler3
所以,我們得到如下結論:
1、ctx.writeAndFlush是從當前ChannelHandler開始,逆序向前執行OutboundHandler。
2、ctx.writeAndFlush所在ChannelHandler後面的OutboundHandler將不會被執行。
實驗六:ctx.channel().writeAndFlush 的OutboundHandler的執行順序是什麼?
還是實驗五的程式碼,不同之處只是把ctx.writeAndFlush修改為ctx.channel().writeAndFlush。
1 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 2 System.out.println("進入 EchoInboundHandler2.channelRead"); 3 4 String data = ((ByteBuf) msg).toString(CharsetUtil.UTF_8); 5 System.out.println("EchoInboundHandler2.channelRead 接收到資料:" + data); 6 //ctx.writeAndFlush(Unpooled.copiedBuffer("[第一次write] [EchoInboundHandler2] " + data, CharsetUtil.UTF_8)); 7 ctx.channel().writeAndFlush(Unpooled.copiedBuffer("測試一下channel().writeAndFlush", CharsetUtil.UTF_8)); 8 ctx.fireChannelRead(Unpooled.copiedBuffer("[EchoInboundHandler2] " + data, CharsetUtil.UTF_8)); 9 10 System.out.println("退出 EchoInboundHandler2 channelRead"); 11 }
執行結果如下:
由上圖可知,所有OutboundHandler都執行了,由此我們得到結論:
1、ctx.channel().writeAndFlush 是從最後一個OutboundHandler開始,依次逆序向前執行其他OutboundHandler,即使最後一個ChannelHandler是OutboundHandler,在InboundHandler之前,也會執行該OutbondHandler。
2、千萬不要在OutboundHandler的write方法裡執行ctx.channel().writeAndFlush,否則就死迴圈了。
三、總結
1、InboundHandler是通過fire事件決定是否要執行下一個InboundHandler,如果哪個InboundHandler沒有呼叫fire事件,那麼往後的Pipeline就斷掉了。
2、InboundHandler是按照Pipleline的載入順序,順序執行。
3、OutboundHandler是按照Pipeline的載入順序,逆序執行。
4、有效的InboundHandler是指通過fire事件能觸達到的最後一個InboundHander。
5、如果想讓所有的OutboundHandler都能被執行到,那麼必須把OutboundHandler放在最後一個有效的InboundHandler之前。
6、推薦的做法是通過addFirst載入所有OutboundHandler,再通過addLast載入所有InboundHandler。
7、OutboundHandler是通過write方法實現Pipeline的串聯的。
8、如果OutboundHandler在Pipeline的處理鏈上,其中一個OutboundHandler沒有呼叫write方法,最終訊息將不會發送出去。
9、ctx.writeAndFlush是從當前ChannelHandler開始,逆序向前執行OutboundHandler。
10、ctx.writeAndFlush所在ChannelHandler後面的OutboundHandler將不會被執行。
11、ctx.channel().writeAndFlush 是從最後一個OutboundHandler開始,依次逆序向前執行其他OutboundHandler,即使最後一個ChannelHandler是OutboundHandler,在InboundHandler之前,也會執行該OutbondHandler。
12、千萬不要在OutboundHandler的write方法裡執行ctx.channel().writeAndFlush,否則就死迴圈了。
&n