1. 程式人生 > >Netty學習(三)-Netty重要介面講解

Netty學習(三)-Netty重要介面講解

上一節我們寫了一個HelloWorld,對於Netty的執行有了一定的瞭解,知道Netty是如何啟動客戶端和伺服器端。這一節我們簡要的講解一下幾個重要的介面,初步探討Netty的執行機制,當然剛學Netty就深入原理肯定是很枯燥的,所以我們就點到為止。

1. ChannelPipeLine和ChannelHandler

在上一篇中我們在ChannelInitializer類的initChannel方法中使用了ChannelPipeline,然後在ChannelPipeline中使用了handler來處理業務邏輯。

ChannelPipeline是ChannelHandler的容器,它負責ChannelHandler的管理和事件攔截與排程。Netty的ChannelPipeline和ChannelHandler機制類似於Servlet 和Filter 過濾器,這類攔截器實際上是職責鏈模式的一種變形,主要是為了方便事件的攔截和使用者業務邏輯的定製。

Netty的channel運用機制和Filter過濾器機制一樣,它將Channel 的資料管道抽象為ChannelPipeline. 訊息在ChannelPipeline中流動和傳遞。ChannelPipeline 持有I/O事件攔截器ChannelHandler 的連結串列,由ChannelHandler 對I/0 事件進行攔截和處理,可以方便地通過新增和刪除ChannelHandler 來實現小同的業務邏輯定製,不需要對已有的ChannelHandler進行修改,能夠實現對修改封閉和對擴充套件的支援。

通過一張圖我們來看一下他們之間的關係:

一個Channel中包含一個ChannelPipeline,用來處理Channel中的事件,一個ChannelPipeline中可以包含很多個handler,第二節的示例程式碼中我們也看到了,使用各種handler來處理通訊資訊。

同時我們也注意到在hadler中繼承了ChannelInboundHandlerAdapter類並實現了他的一些方法,比如:channelRead,channelActive,channelInactive等等,我們看到這些方法中都有一個引數:ChannelHandlerContext ctx。這個ChannelHandlerContext就是handler的上下文物件,有了這個ChannelHandlerContext你就獲得了一切,你可以獲得通道,獲得事件的控制權。

事實上,使用者不需要自己建立pipeline,因為使用ServerBootstrap 或者Bootstrap 啟動
服務端或者客戶端時, Netty 會為每個Channel 連線建立一個獨立的pipeline。

ChannelPipeline pipeline = socketChannel.pipeline();

pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192,Delimiters.lineDelimiter()));
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());

// 客戶端的邏輯
pipeline.addLast("handler", new HelloWorldClientHandler());

ChannelPipeline 是執行緒安全的, 這意味著N個業務執行緒可以併發地操作ChannelPipeline
而不存在多執行緒併發問題。但是,ChannelHandler卻不是執行緒安全的,這意味著儘管
ChannelPipeline 是執行緒去全的, 但是仍然需要自己保證ChannelHandler的執行緒安全。

Netty 中的事件分為inbound 事件和outbound 事件。inbound 事件通常由I/O執行緒觸發,例如TCP 鏈路建立事件、鏈路關閉事件、讀事件、異常通知事件等。Outbound 事件通常是I/O 使用者主動發起的網路I/O 操作,例如使用者發起的連線操作、繫結操作、訊息傳送等操作。

我們常用的inbound事件有:

  • ChannelHandlerContext fireChannelRegistered() //channel註冊事件
  • ChannelHandlerContext fireChannelActive() //channel啟用事件
  • ChannelHandlerContext fireExceptionCaught(Throwable var1) //channel異常處理事件
  • ChannelHandlerContext fireUserEventTriggered(Object var1) //使用者自定義事件
  • ChannelHandlerContext fireChannelRead(Object var1) //讀事件

pipeline 中以fireXXX命名的方法都是從I/O 執行緒流向使用者業務Handler的inbound 事件,它們的實現因功能而異,但是處理步驟類似:

  1. 呼叫HeadHandler對應的fireXXX 方法

  2. 執行事件相關的邏輯操作

常用的outbound事件有:

  • ChannelFuture bind(SocketAddress var1, ChannelPromise var2) //繫結地址
  • ChannelFuture connect(SocketAddress var1, ChannelPromise var2) //連線伺服器
  • ChannelFuture write(Object var1) //傳送事件
  • ChannelHandlerContext flush() //重新整理事件

上面我們說到事件,netty的事件機制是由前至後的,一般來說,都是一個channel的ChannnelActive方法中呼叫fireChannelActive來觸發呼叫下一個handler中的ChannelActive方法,即你在ChannelPipeline中新增handler的時候,要在第一個handler的channelActive方法中呼叫fireChannelActive,以此來觸發下一個事件。我們再來寫一個案例說明一下:

客戶端:

public class HWClient {
    private  int port;
    private  String address;

    public HWClient(int port, String address) {
        this.port = port;
        this.address = address;
    }

    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ClientChannelInitializer());

        try {
            ChannelFuture future = bootstrap.connect(address,port).sync();
            future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        HWClient client = new HWClient(7788,"127.0.0.1");
        client.start();
    }
}

客戶端ClientChannelInitializer:

public class ClientChannelInitializer extends  ChannelInitializer<SocketChannel> {

    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 客戶端的handler
        //先呼叫handler在ChannnelActive方法中呼叫fireChannelActive會啟用handler1
        pipeline.addLast("handler", new HWClientHandler());
        pipeline.addLast("handler1", new BaseClientHandler());
    }
}

客戶端handler:

public class HWClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server say : "+msg.toString());
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Handler1");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Client is close");
    }
}

客戶端的第二個handler:

public class BaseClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Handler2");
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

服務端:

public class HWServer {
    private int port;

    public HWServer(int port) {
        this.port = port;
    }

    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();

        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                                    .channel(NioServerSocketChannel.class)
                                    .childHandler(new ServerChannelInitializer());

        try {
            ChannelFuture future = server.bind(port).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        HWServer server = new HWServer(7788);
        server.start();
    }
}

服務端ServerChannelInitializer:

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();

        // 字串解碼 和 編碼
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());

        // 自己的邏輯Handler
        pipeline.addLast("handler", new HWServerHandler());
    }
}

服務端handler:

public class HWServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+"===>server: "+msg.toString());
        ctx.write("received your msg");
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

我們啟動服務端和客戶端,會發現客戶端的兩個handler都通過了。

先呼叫HWClientHandler,打印出:HWClientHandler channelActive;繼而呼叫了BaseClientHandler ,打印出:BaseClient1Handler channelActive.