1. 程式人生 > >7.Netty中 handler 的執行順序

7.Netty中 handler 的執行順序

什麽 pre art img 代碼 client bind throws cau

1.Netty中handler的執行順序 

  Handler在Netty中,無疑占據著非常重要的地位。Handler與Servlet中的filter很像,通過Handler可以完成通訊報文的解碼編碼、攔截指定的報文、

統一對日誌錯誤進行處理、統一對請求進行計數、控制Handler執行與否。一句話,沒有它做不到的只有你想不到的

  Netty中的所有handler都實現自ChannelHandler接口。按照輸入輸出來分,分為ChannelInboundHandler、ChannelOutboundHandler兩大類

ChannelInboundHandler對從客戶端發往服務器的報文進行處理,一般用來執行解碼、讀取客戶端數據、進行業務處理等;ChannelOutboundHandler

對從服務器發往客戶端的報文進行處理,一般用來進行編碼、發送報文到客戶端

  Netty中可以註冊多個handler。ChannelInboundHandler按照註冊的先後順序執行;ChannelOutboundHandler按照註冊的先後順序逆序執行

如下圖所示,按照註冊的先後順序對Handler進行排序,request進入Netty後的執行順序為:

技術分享圖片

2.Netty中handler執行順序代碼示例:

客戶端代碼同上

服務端代碼及業務邏輯類:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;/** * ? 配置服務器功能,如線程、端口 ? 實現服務器處理程序,它包含業務邏輯,決定當有一個請求連接或接收數據時該做什麽 */ public
class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws Exception { EventLoopGroup eventLoopGroup = null; try { //server端引導類 ServerBootstrap serverBootstrap = new ServerBootstrap(); //連接池處理數據 eventLoopGroup = new NioEventLoopGroup(); serverBootstrap.group(eventLoopGroup) .channel(NioServerSocketChannel.class)//指定通道類型為NioServerSocketChannel,一種異步模式,OIO阻塞模式為OioServerSocketChannel .localAddress("localhost",port)//設置InetSocketAddress讓服務器監聽某個端口已等待客戶端連接。 .childHandler(new ChannelInitializer<Channel>() {//設置childHandler執行所有的連接請求 @Override protected void initChannel(Channel ch) throws Exception { // 註冊兩個InboundHandler,執行順序為註冊順序,所以應該是InboundHandler1 InboundHandler2 ch.pipeline().addLast(new EchoOutHandler1()); ch.pipeline().addLast(new EchoOutHandler2()); // 註冊兩個OutboundHandler,執行順序為註冊順序的逆序,所以應該是OutboundHandler2 OutboundHandler1 ch.pipeline().addLast(new EchoInHandler1()); ch.pipeline().addLast(new EchoInHandler2()); } }); // 最後綁定服務器等待直到綁定完成,調用sync()方法會阻塞直到服務器完成綁定,然後服務器等待通道關閉,因為使用sync(),所以關閉操作也會被阻塞。 ChannelFuture channelFuture = serverBootstrap.bind().sync(); System.out.println("開始監聽,端口為:" + channelFuture.channel().localAddress()); channelFuture.channel().closeFuture().sync(); } finally { eventLoopGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoServer(20000).start(); } }

業務類EchoInHandler1

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

import cn.itcast_03_netty.sendobject.bean.Person;

public class EchoInHandler1 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        System.out.println("in1");
         // 通知執行下一個InboundHandler
        ctx.fireChannelRead(msg);//ChannelInboundHandler之間的傳遞,通過調用 ctx.fireChannelRead(msg) 實現
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();//刷新後才將數據發出到SocketChannel
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

業務類 EchoInHandler2:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;

public class EchoInHandler2 extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("in2");
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        System.out.println("接收客戶端數據:" + body);
        //向客戶端寫數據
        System.out.println("server向client發送數據");
        String currentTime = new Date(System.currentTimeMillis()).toString();
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp); //ctx.write(msg) 將傳遞到 ChannelOutboundHandler
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();//刷新後才將數據發出到SocketChannel
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

業務邏輯類 EchoOutHandler2

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {

     @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("out2");
            // 執行下一個OutboundHandler
            super.write(ctx, msg, promise);
        }
}

業務邏輯類 EchoOutHandler1:

import java.util.Date;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter {
    @Override
    // 向client發送消息
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("out1");
        String currentTime = new Date(System.currentTimeMillis()).toString();
        ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.write(resp);
        ctx.flush(); //ctx.write()方法執行後,需要調用flush()方法才能令它立即執行
       }
}

運行結果:

開始監聽,端口為:/127.0.0.1:20000
in1
in2
接收客戶端數據:QUERY TIME ORDER
server向client發送數據
out2
out1

總結: 

在使用Handler的過程中,需要註意:

  1、ChannelInboundHandler之間的傳遞,通過調用 ctx.fireChannelRead(msg) 實現;調用ctx.write(msg) 將傳遞到ChannelOutboundHandler

  2、ctx.write()方法執行後,需要調用flush()方法才能令它立即執行。

  3、ChannelOutboundHandler 在註冊的時候需要放在最後一個ChannelInboundHandler之前,否則將無法傳遞到ChannelOutboundHandler

   (流水線pipeline中outhander不能放到最後,否則不生效)

  4、Handler的消費處理放在最後一個處理。

7.Netty中 handler 的執行順序