7.Netty中 handler 的執行順序
1.Netty中handler的執行順序
Handler在Netty中,無疑占據著非常重要的地位。Handler與Servlet中的filter很像,通過Handler可以完成通訊報文的解碼編碼、攔截指定的報文、
統一對日誌錯誤進行處理、統一對請求進行計數、控制Handler執行與否。一句話,沒有它做不到的只有你想不到的
Netty中的所有handler都實現自ChannelHandler接口。按照輸入輸出來分,分為ChannelInboundHandler、ChannelOutboundHandler兩大類
ChannelInboundHandler對從客戶端發往服務器的報文進行處理,一般用來執行解碼、讀取客戶端數據、進行業務處理等;ChannelOutboundHandler
對從服務器發往客戶端的報文進行處理,一般用來進行編碼、發送報文到客戶端
Netty中可以註冊多個handler。ChannelInboundHandler按照註冊的先後順序執行;ChannelOutboundHandler按照註冊的先後順序逆序執行
如下圖所示,按照註冊的先後順序對Handler進行排序,request進入Netty後的執行順序為:
2.Netty中handler執行順序代碼示例:
客戶端代碼同上
服務端代碼及業務邏輯類:
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel;import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;/** * ? 配置服務器功能,如線程、端口 ? 實現服務器處理程序,它包含業務邏輯,決定當有一個請求連接或接收數據時該做什麽 */ publicclass EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup eventLoopGroup = null; try { //server端引導類 ServerBootstrap serverBootstrap = new ServerBootstrap(); //連接池處理數據 eventLoopGroup = new NioEventLoopGroup(); serverBootstrap.group(eventLoopGroup) .channel(NioServerSocketChannel.class)//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel .localAddress("localhost",port)//設置InetSocketAddress讓服務器監聽某個端口已等待客戶端連接。 .childHandler(new ChannelInitializer<Channel>() {//設置childHandler執行所有的連接請求 @Override protected void initChannel(Channel ch) throws Exception { // 註冊兩個InboundHandler,執行順序為註冊順序,所以應該是InboundHandler1 InboundHandler2 ch.pipeline().addLast(new EchoOutHandler1()); ch.pipeline().addLast(new EchoOutHandler2()); // 註冊兩個OutboundHandler,執行順序為註冊順序的逆序,所以應該是OutboundHandler2 OutboundHandler1 ch.pipeline().addLast(new EchoInHandler1()); ch.pipeline().addLast(new EchoInHandler2()); } }); // 最後綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,然後服務器等待通道關閉,因為使用sync(),所以關閉操作也會被阻塞。 ChannelFuture channelFuture = serverBootstrap.bind().sync(); System.out.println("開始監聽,端口為:" + channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoServer(20000).start(); } }
業務類EchoInHandler1:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; import cn.itcast_03_netty.sendobject.bean.Person; public class EchoInHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("in1"); // 通知執行下一個InboundHandler ctx.fireChannelRead(msg);//ChannelInboundHandler之間的傳遞,通過調用 ctx.fireChannelRead(msg) 實現 } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush();//刷新後才將數據發出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
業務類 EchoInHandler2:
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class EchoInHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("in2"); ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("接收客戶端數據:" + body); //向客戶端寫數據 System.out.println("server向client發送數據"); String currentTime = new Date(System.currentTimeMillis()).toString(); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); //用ctx.write(msg) 將傳遞到 ChannelOutboundHandler } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush();//刷新後才將數據發出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
業務邏輯類 EchoOutHandler2:
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("out2"); // 執行下一個OutboundHandler super.write(ctx, msg, promise); } }
業務邏輯類 EchoOutHandler1:
import java.util.Date; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter { @Override // 向client發送消息 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("out1"); String currentTime = new Date(System.currentTimeMillis()).toString(); ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp); ctx.flush(); //ctx.write()方法執行後,需要調用flush()方法才能令它立即執行 } }
運行結果:
開始監聽,端口為:/127.0.0.1:20000
in1
in2
接收客戶端數據:QUERY TIME ORDER
server向client發送數據
out2
out1
總結:
在使用Handler的過程中,需要註意:
1、ChannelInboundHandler之間的傳遞,通過調用 ctx.fireChannelRead(msg) 實現;調用ctx.write(msg) 將傳遞到ChannelOutboundHandler
2、ctx.write()方法執行後,需要調用flush()方法才能令它立即執行。
3、ChannelOutboundHandler 在註冊的時候需要放在最後一個ChannelInboundHandler之前,否則將無法傳遞到ChannelOutboundHandler
(流水線pipeline中outhander不能放到最後,否則不生效)
4、Handler的消費處理放在最後一個處理。
7.Netty中 handler 的執行順序