1. 程式人生 > 其它 >Netty學習之核心元件ChannelPipeline

Netty學習之核心元件ChannelPipeline

ChannelPipeline 提供了ChannelHandler 鏈的容器,並定義了用於在該鏈上傳播入站和出站事件流的API。ChannelPipeline不是單獨存在,它肯定會和Channel、ChannelHandler、ChannelHandlerContext關聯在一起。

一、ChannelHandler

  1、概述

      

  如上圖所示ChannelHandler下主要是兩個子介面

ChannelInboundHandler(入站):處理輸入資料和Channel狀態型別改變。

介面卡: ChannelInboundHandlerAdapter(介面卡設計模式)

常用的: SimpleChannelInboundHandler

  ChannelOutboundHandler(出站): 處理輸出資料

介面卡: ChannelOutboundHandlerAdapter

  Netty 以介面卡類的形式提供了大量預設的ChannelHandler 實現,幫我們簡化應用程式處理邏輯的開發過程。每一個Handler都一定會處理出站或者入站(可能兩者都處理資料),例如對於入站的Handler可能會繼承SimpleChannelInboundHandler或者ChannelInboundHandlerAdapter,而SimpleChannelInboundHandler又是繼承於ChannelInboundHandlerAdapter,最大的區別在於SimpleChannelInboundHandler會對沒有外界引用的資源進行一定的清理,並且入站的訊息可以通過泛型來規定。

  採用介面卡模式,是因為我們在寫自定義Handel時候,很少會直接實現上面兩個介面,因為介面中有很多預設方法需要實現,所以這裡就採用了設配器模式,ChannelInboundHandlerAdapter和

ChannelInboundHandlerAdapter就是設配器模式的產物,讓它去實現上面介面,實現它所有方法。那麼你自己寫自定義Handel時,只要繼承它,就無須重寫上面介面的所有方法了。

  2、Channel 生命週期(執行順序也是從上倒下)

  • channelRegistered: channel註冊到一個EventLoop。
  • channelActive: 變為活躍狀態(連線到了遠端主機),可以接受和傳送資料
  • channelInactive: channel處於非活躍狀態,沒有連線到遠端主機
  • channelUnregistered: channel已經建立,但是未註冊到一個EventLoop裡面,也就是沒有和Selector繫結

  3、ChannelHandler 生命週期

  • handlerAdded: 當 ChannelHandler 新增到 ChannelPipeline 呼叫
  • handlerRemoved: 當 ChannelHandler 從 ChannelPipeline 移除時呼叫
  • exceptionCaught: 當 ChannelPipeline 執行丟擲異常時呼叫

二、ChannelPipeline

  1、概述

           

  如圖所示ChannelPipeline類是ChannelHandler例項物件的連結串列,用於處理或截獲通道的接收和傳送資料。它提供了一種高階的擷取過濾模式(類似serverlet中的filter功能),讓用

戶可以在ChannelPipeline中完全控制一個事件以及如何處理ChannelHandler與ChannelPipeline的互動。

對於每個新的通道Channel,都會建立一個新的ChannelPipeline,並將器pipeline附加到channel中。下圖描述ChannelHandler與pipeline中的關係,一個io操作可以由一個ChannelInboundHandler或ChannelOutboundHandle進行處理,並通過呼叫ChannelInboundHandler處理入站io或通過ChannelOutboundHandler處理出站IO。

       

  2、常用方法

  addFirst(...)   //新增ChannelHandler在ChannelPipeline的第一個位置
  addBefore(...)   //在ChannelPipeline中指定的ChannelHandler名稱之前新增ChannelHandler
  addAfter(...)   //在ChannelPipeline中指定的ChannelHandler名稱之後新增ChannelHandler
  addLast(...)   //在ChannelPipeline的末尾新增ChannelHandler
  remove(...)   //刪除ChannelPipeline中指定的ChannelHandler
  replace(...)   //替換ChannelPipeline中指定的ChannelHandler

  ChannelPipeline可以動態新增、刪除、替換其中的ChannelHandler,這樣的機制可以提高靈活性。示例:

  ChannelPipeline pipeline = ch.pipeline(); 
  FirstHandler firstHandler = new FirstHandler(); 
  pipeline.addLast("handler1", firstHandler); 
  pipeline.addFirst("handler2", new SecondHandler()); 
  pipeline.addLast("handler3", new ThirdHandler()); 
  pipeline.remove("“handler3“"); 
  pipeline.remove(firstHandler); 
  pipeline.replace("handler2", "handler4", new FourthHandler());

  3、入站出站Handler執行順序

  一般的專案中,inboundHandler和outboundHandler有多個,在Pipeline中的執行順序?InboundHandler順序執行,OutboundHandler逆序執行。如程式碼:

ch.pipeline().addLast(new InboundHandler1());
          ch.pipeline().addLast(new OutboundHandler1());
          ch.pipeline().addLast(new OutboundHandler2());
          ch.pipeline().addLast(new InboundHandler2());
  或者:
          ch.pipeline().addLast(new OutboundHandler1());
          ch.pipeline().addLast(new OutboundHandler2());
          ch.pipeline().addLast(new InboundHandler1());
          ch.pipeline().addLast(new InboundHandler2());     

  其實上面的執行順序都是一樣的:

 InboundHandler1--> InboundHandler2 -->OutboundHandler2 -->OutboundHandler1

  4、ChannelPipeline中channelHandler協作規則

  4.1、業務執行後需要 ChannelHandlerContext.fire*()或者 Channel*Handler.super*(), 否則不會傳遞到下一個handler,如下圖:

    

  4.2、如果outhandler在inhandler之後新增,所有inhandler中的最後一個inhandler需要寫個ctx.channel().write, 這樣能進入outhandler的write()中執行,如下圖:

          

  4.3、在第4.2中,如果不希望使用ctx.channel().write,那麼要觸發outhandler的write操作需要把outhandler在inhandler之前新增到pipeline(重要),該情況下還使用ctx.channel().write,會觸發兩次outhandler.write,如下圖:

          

  通常來說outhandler都放到前面新增。netty findchannelhandler機制尋找讀事件會先找outhanlder的read方法,在inhandler前面新增的outhandler不能在write方法內呼叫fireChannelRead事件,否則將pipeline會進入死迴圈,死迴圈為:outHandler(read)-->inhandler(read)-->outhandler(write)-->inhandler(read) ······最後這次read觸發就是因為outhandler的write方法出現了fireChannelRead事件。

  4.4、outhandler使用ctx.write(msg,promise)傳遞給下一個outhandler,如下圖:

          

  4.5、所有的inhandler的最後一個使用ctx.writeAndFlush(msg)觸發給outhandler所有的outhandler的出去口,outhandler最後也需要通過ctx.writeAndFlush(msg)才能傳送給客戶端。如果多個inhandler執行ctx.writeAndFlush(msg) 客戶端則會收到多個返回資料,因為這樣outhandler會被觸發多次。

  4.6、outBound和Inbound誰先執行,針對客戶端和服務端而言,客戶端是發起請求再接受資料,先outbound再inbound,服務端則相反。

三、ChannelHandlerContext

  ChannelPipeline並不是直接管理ChannelHandler,而是通過ChannelHandlerContext來間接管理,這一點通過ChannelPipeline的預設實現DefaultChannelPipeline可以看出來。

  DefaultChannelHandlerContext和DefaultChannelPipeline是ChannelHandlerContext和ChannelPipeline的預設實現,在DefaultPipeline內部

DefaultChannelHandlerContext組成了一個雙向連結串列。我們看下DefaultChannelPipeline的建構函式:

/**
  * 可以看到,DefaultChinnelPipeline 內部使用了兩個特殊的Hander 來表示Handel鏈的頭和尾。
  */
 public DefaultChannelPipeline(AbstractChannel channel) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        this.channel = channel;
 
        TailHandler tailHandler = new TailHandler();
        tail = new DefaultChannelHandlerContext(this, null, generateName(tailHandler), tailHandler);
 
        HeadHandler headHandler = new HeadHandler(channel.unsafe());
        head = new DefaultChannelHandlerContext(this, null, generateName(headHandler), headHandler);
 
        head.next = tail;
        tail.prev = head;
    }

  所以對於DefaultChinnelPipeline,它的Handler頭部和尾部的Handler是固定的,我們所新增的Handler是新增在這個頭和尾之間的Handler。

        

四、幾者關係

  先大致說下什麼是Channel:通常來說, 所有的 NIO 的 I/O 操作都是從 Channel 開始的,一個 channel 類似於一個 stream。在Netty中,Channel是客戶端和服務端建立的一個連線通道。

雖然java Stream 和 NIO Channel都是負責I/O操作,但他們還是有許多區別的:

  • 我們可以在同一個 Channel 中執行讀和寫操作, 然而同一個 Stream 僅僅支援讀或寫。
  • Channel 可以非同步地讀寫, 而 Stream 是阻塞的同步讀寫。
  • Channel 總是從 Buffer 中讀取資料, 或將資料寫入到 Buffer 中。

  幾者的關係圖如下:

        

  一個Channel包含一個ChannelPipeline,建立Channel時會自動建立一個ChannelPipeline,每個Channel都有一個管理它的pipeline,這關聯是永久性的。

  每一個ChannelPipeline中可以包含多個ChannelHandler。所有ChannelHandler都會順序加入到ChannelPipeline中,ChannelHandler例項與ChannelPipeline之間的橋樑是ChannelHandlerContext例項。

五、傳播過程解釋

  現在通過原始碼將上面的整個傳播流程大致走一遍。先從Channel的抽象子類AbstractChannel開始,下面是AbstractChannel#write()方法的實現:

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    // ...
    @Override
    public Channel write(Object msg) {
        return pipeline.write(msg);
    }
    // ...
}

  AbstractChannel直接呼叫了Pipeline的write()方法:

        

  再看DefaultChannelPipeline的write()方法實現:

final class DefaultChannelPipeline implements ChannelPipeline {
    // ...
    @Override
    public ChannelFuture write(Object msg) {
        return tail.write(msg);
    }
    // ...
}

  因為write是個outbound事件,所以DefaultChannelPipeline直接找到tail部分的context,呼叫其write()方法:

        

  接著看DefaultChannelHandlerContext的write()方法:

final class DefaultChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext {
    // ...
    @Override
    public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
    }
 
    @Override
    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }
 
        validatePromise(promise, true);
 
        write(msg, false, promise);
 
        return promise;
    }
 
    private void write(Object msg, boolean flush, ChannelPromise promise) {
        DefaultChannelHandlerContext next = findContextOutbound();
        next.invokeWrite(msg, promise);
        if (flush) {
            next.invokeFlush();
        }
    }
 
    private DefaultChannelHandlerContext findContextOutbound() {
        DefaultChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
 
    private void invokeWrite(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
 
    // ...
}

  context的write()方法沿著context鏈往前找,直至找到一個outbound型別的context為止(ctx.outbound),然後呼叫其invokeWrite()方法:

        

  invokeWrite()接著呼叫handler的write()方法:

         

  最後看看ChannelOutboundHandlerAdapter的write()方法實現:

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
    // ...
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }
    // ...
}

  預設的實現呼叫了context的write()方法而不做任何處理,這樣write事件就沿著outbound鏈繼續傳播:

         

  可見,Pipeline的事件傳播,是靠Pipeline,Context和Handler共同協作完成的。