1. 程式人生 > >Netty(ChannelHandler 和 ChannelPipeline)

Netty(ChannelHandler 和 ChannelPipeline)

ChannelHandler家族

channel的生命週期

    Interface Channel定義了一組和ChannelInboundHandler API密切相關的簡單但功能強大的狀態模型,其Channel主要有4個狀態。

狀態

描述

ChannelUnregistered

Channel 已經被建立,但還未註冊到EventLoop

ChannelRegistered

Channel 已經被註冊到了EventLoop

ChannelActive

Channel 處於活動狀態(已經連線到它的遠端節點)。它現在可以接收和傳送資料了

ChannelInactive

Channel 沒有連線到遠端節點

    當這些狀態發生改變時,將會生成對應的事件。這些事件將會被轉發給ChannelPipeline中的ChannelHandler,其可以隨後對它們做出響應。 

ChannelHandler的生命週期

interface ChannelHandler定義的生命週期操作如下所示,在ChannelHandler被新增到ChannelPipeline中或者被ChannelPipeline中移除時會呼叫 這些操作。這些方法中的每一個都接受一個ChannelHandlerContext引數。

狀態

描述

handlerAdded

當把 ChannelHandler 新增到 ChannelPipeline 中時被呼叫

handlerRemoved

當從 ChannelPipeline 中移除 ChannelHandler 時被呼叫

exceptionCaught

當處理過程中在 ChannelPipeline 中有錯誤時被呼叫

 ChannelHandler有兩個重要的子介面:

  • ChannelInboundHandler——處理入站資料以及各種狀態變化;
  • ChannelOutboundHandler——處理出站資料並且允許攔截所有的操作。

 ChannelInboundHandler介面

以下方法將會在資料被接收時或者 與其對應的Channel狀態發生改變時被呼叫

/**
 * {@link ChannelHandler} which adds callbacks for state changes. This allows the user
 * to hook in to state changes easily.
 */
public interface ChannelInboundHandler extends ChannelHandler {

    /**
     * 當Channel已經註冊到它的EventLoop並且能夠處理I/O時被呼叫
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * 當Channel從它的EventLoop登出並且無法處理任何I/O時被呼叫
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * 當Channel處於活動狀態時被呼叫;Channel已經連線/繫結並且已經就緒
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * 當Channel離開活動狀態並且不再連線它的遠端節點時被呼叫
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * 當從Channel讀取資料時被呼叫
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * 當Channel上的一個讀操作完成時被呼叫
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * 當ChannelnboundHandler.fireUserEventTriggered()方法被呼叫時被呼叫,因為一個POJO被傳經了ChannelPipeline
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * 當Channel的可寫狀態發生改變時被呼叫。使用者可以確保寫操作不會完成得太快(以避免發生OutOfMemoryError)或者可以在Channel變為再次可寫時恢復寫入。可以通過呼叫Channel的isWritable()方法來檢測Channel的可寫性。與可寫性相關的閾值可以通過Channel.config().setWriteHighWaterMark()和Channel.config().setWriteLowWater-Mark()方法來設定
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /**
     * 如果丟擲一個可丟擲的異常物件,則呼叫。
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

ChannelOutboundHandler介面

    出站操作和資料將由ChannelOutboundHandler處理。它的方法將被Channel、ChannelPipeline以及ChannelHandlerContext呼叫。ChannelOutboundHandler的一個強大的功能是可以按需推遲操作或者事件,這使得可以通過一些複雜的方法來處理請求。例如,如果到遠端節點的寫入被暫停了,那麼你可以推遲沖刷操作並在稍後繼續。

public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * 當請求將Channel繫結到本地地址時被呼叫
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * 當請求將Channel連線到遠端節點時被呼叫
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * 當請求將Channel從遠端節點斷開時被呼叫
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * 當請求關閉Channel時被呼叫
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * 當請求將Channel從它的EventLoop登出時被呼叫
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * 當請求從Channel讀取更多的資料時被呼叫
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
     * 當請求通過Channel將資料寫到遠端節點時被呼叫
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * 當請求通過Channel將入隊資料沖刷到遠端節點時被呼叫
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

    ChannelPromiseChannelFuture ChannelOutboundHandler中的大部分方法都需要一個ChannelPromise引數,以便在操作完成時得到通知。ChannelPromise是ChannelFuture的一個子類,其定義了一些可寫的方法,如setSuccess()和setFailure(),從而使ChannelFuture不可變。

ChannelHandler介面卡

    ChannelInboundHandlerAdapterChannelOutboundHandlerAdapter類作為自己的ChannelHandler的介面卡類。這兩個介面卡分別提供了ChannelInboundHandler和ChannelOutboundHandler的基本實現。通過擴充套件抽象類ChannelHandlerAdapter,它們獲得了它們共同的超介面ChannelHandler的方法。生成的類的層次結構如:

    ChannelHandlerAdapter還提供了實用方法isSharable()。如果其對應的實現被標註為Sharable,那麼 這個方法將返回true,表示它可以被新增到多個ChannelPipeline中。(@Sharable註解)

    在ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter中所提供的方法體呼叫了其相關聯的ChannelHandlerContext上的等效方法,從而將事件轉發到了ChannelPipeline中的下一個ChannelHandler中。

資源管理

    為了診斷潛在的(資源洩漏)問題,Netty提供了class ResourceLeakDetector它將對你應用程式的緩衝區分配做大約1%的取樣來檢測記憶體洩露。相關的開銷是非常小的。

    Netty目前定義了4種洩漏檢測級別:

級別

描述

DISABLED

禁用洩漏檢測。只有在詳盡的測試之後才應使用

SIMPLE

使用1%的預設取樣率檢測並報告任何發現的洩露。這是預設級別,適合絕大部分情況。

ADVANCED

使用預設的取樣率,報告所發現的任何的洩露以及對應的訊息被訪問的位置

PARANOID

類似於ADVANCED,但是其將會對每次(對訊息的)訪問都進行取樣。這對效能將會有很大影響,在除錯階段使用

洩露檢測級別可以通過將下面的Java系統屬性設定為表中的一個值來定義:

java-Dio.netty.leakDetectionLevel=ADVANCED 

    如果一個訊息被消費或者丟棄了,並且沒有傳遞給ChannelPipeline中的下一個ChannelOutboundHandler,那麼使用者就有責任呼叫ReferenceCountUtil.release()。如果訊息到達了實際的傳輸層,那麼當它被寫入時或者Channel關閉時,都將被自動釋放。

ChannelPipeline介面

    ChannelPipeline是一個攔截流經Channel的入站和出站事件的Channel-Handler例項鏈,那麼就很容易看出這些ChannelHandler之間的互動是組成一個應用程式資料和事件處理邏輯的核心。

    每一個新建立的Channel都將會被分配一個新的ChannelPipeline。這項關聯是永久性的;Channel既不能附加另外一個ChannelPipeline,也不能分離其當前的。在Netty元件的生命週期中,這是一項固定的操作,不需要開發人員的任何干預。

    根據事件的起源,事件將會被ChannelInboundHandler或者ChannelOutboundHandler處理。隨後,通過呼叫ChannelHandlerContext實現,它將被轉發給同一超型別的下一個ChannelHandler。

ChannelHandlerContext:

    ChannelHandlerContext使得ChannelHandler能夠和它的ChannelPipeline以及其他的ChannelHandler互動。ChannelHandler可以通知其所屬的ChannelPipeline中的下一個ChannelHandler,甚至可以動態修改它所屬的ChannelPipeline。         ChannelHandlerContext具有豐富的用於處理事件和執行I/O 操作的API。

    當你完成了通過呼叫ChannelPipeline.add*()方法將入站處理器(ChannelInboundHandler)和出站處理器(ChannelOutboundHandler)混合新增到ChannelPipeline之後,每一個ChannelHandler從頭部到尾端的順序位置正如同我們方才所定義它們的一樣。因此,如果你將圖中的處理器(ChannelHandler)從左到右進行編號,那麼第一個被入站事件看到的ChannelHandler將是1,而第一個被出站事件看到的ChannelHandler將是 5。

    在ChannelPipeline傳播事件時,它會測試ChannelPipeline中的下一個Channel-Handler的型別是否和事件的運動方向相匹配。如果不匹配,ChannelPipeline將跳過該ChannelHandler並前進到下一個,直到它找到和該事件所期望的方向相匹配的為止。

修改 ChannelPipeline

    ChannelHandler可以通過新增、刪除或者替換其他的ChannelHandler來實時地修改ChannelPipeline的佈局。

名稱

描述

 addFirst

 addBefore

 addAfter

 addLast

將一個ChannelHandler 新增到ChannelPipeline 中

remove

將一個ChannelHandler 從 ChannelPipeline 中移除

replace

將 ChannelPipeline 中的一個 ChannelHandler 替換為另一個ChannelHandler

ChannelHandler的執行和阻塞 

    通常ChannelPipeline中的每一個ChannelHandler都是通過它的EventLoop(I/O 執行緒)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個執行緒,因為這會對整體的I/O 處理產生負面的影響。但有時可能需要與那些使用阻塞API  的遺留程式碼進行互動。對於這種情況,ChannelPipeline有一些接受一個EventExecutorGroup的add()方法。如果一個事件被傳遞給一個自定義的EventExecutorGroup,它將被包含在這個EventExecutorGroup中的某個EventExecutor所處理,從而被從該Channel本身的EventLoop中移除。對於這種用例,Netty提供了一個叫DefaultEventExecutor-Group的預設實現。

通過型別或者名稱來訪問ChannelHandler的方法:

名稱

描述

 get

 通過型別或者名稱返回 ChannelHandler

 context

 返回和ChannelHandler 繫結的 ChannelHandlerContext

 names

 返回ChannelPipeline 中所有 ChannelHandler 的名稱

觸發事件 

ChannelPipeline的API公開了用於呼叫入站和出站操作的附加方法:

ChannelPipeline的入站操作:

方法名稱 描述
fireChannelRegistered 呼叫ChannelPipeline中下一個ChannelInboundHandler的channelRegistered(ChannelHandlerContext)方法
fireChannelUnregistered 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelUnregistered(ChannelHandlerContext)方法
fireChannelActive 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelActive(ChannelHandlerContext)方法
fireChannelInactive 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelInactive(ChannelHandlerContext)方法
fireExceptionCaught 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireExceptionCaught(ChannelHandlerContext)方法
fireUserEventTriggered 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireUserEventTriggered(ChannelHandlerContext)方法
fireChannelRead 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelRead(ChannelHandlerContext)方法
fireChannelReadComplete 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelReadComplete(ChannelHandlerContext)方法
fireChannelWritabilityChanged 呼叫ChannelPipeline中下一個ChannelInboundHandler的fireChannelWritabilityChanged(ChannelHandlerContext)方法

在出站這邊,處理事件將會導致底層的套接字上發生一系列的動作。

ChannelPipeline的出站操作:

方法名稱 描述
bind 將Channel繫結到一個本地地址,這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的bind(ChannelHandlerContext,Socket-Address, ChannelPromise)方法
connect 將Channel連線到一個遠端地址,這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的connect(ChannelHandlerContext, SocketAddress, ChannelPromise)方法
disconnect 將Channel斷開連線  。這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的disconnect(ChannelHandlerContext, ChannelPromise)方法
close 將Channel關閉。 這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的close(ChannelHandlerContext, ChannelPromise)方法
deregister 將Channel從它先前所分配的EventExecutor(即EventLoop)中登出。這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的deregister(ChannelHandlerContext, ChannelPromise)方法
flush 沖刷Channel所有掛起的寫入。這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的flush(ChannelHandlerContext)方法
write 將訊息寫入Channel。這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的write(ChannelHandlerContext, Object msg, Channel-Promise)方法。注意:這並不會將訊息寫入底層的Socket,而只會將它放入佇列中。要將它  寫入Socket,需要呼叫flush()或者writeAndFlush()方法
writeAndFlush 這是一個先呼叫write()方法再接著呼叫flush()方法的便利方法
read 請求從Channel中讀取更多的資料。這將呼叫ChannelPipeline中的下一個ChannelOutboundHandler的read(ChannelHandlerContext)方法

總結:

  • ChannelPipeline儲存了與Channel相關聯的ChannelHandler;
  • ChannelPipeline可以根據需要,通過新增或者刪除ChannelHandler來動態地修改;
  • ChannelPipeline有著豐富的API用以被呼叫,以響應入站和出站事件。

ChannelHandlerContext介面 

    ChannelHandlerContext代表了ChannelHandler和ChannelPipeline之間的關聯,每當有ChannelHandler新增到ChannelPipeline中時,都會建立ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所關聯的ChannelHandler和在同一個ChannelPipeline中的其他ChannelHandler之間的互動。

     ChannelHandlerContext有很多的方法,其中一些方法也存在於Channel和ChannelPipeline本身上,但是有一點重要的不同。如果呼叫Channel或者ChannelPipeline上的這些方法,它們將沿著整個ChannelPipeline進行傳播。而呼叫位於ChannelHandlerContext上的相同方法,則將從當前所關聯的ChannelHandler開始,並且只會傳播給位於該ChannelPipeline中的下一個能夠處理該事件的ChannelHandler。

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

    /**
     * 返回繫結到這個例項的Channel
     */
    Channel channel();

    /**
     * 返回排程事件的EventExecutor
     */
    EventExecutor executor();

    /**
     * 返回這個例項的唯一名稱
     */
    String name();

    /**
     * 返回繫結到這個例項的ChannelHandler
     */
    ChannelHandler handler();

    /**
     * 如果所關聯的ChannelHandler已經被從ChannelPipeline中移除則返回true
     */
    boolean isRemoved();

    /**
     * 觸發對下一個ChannelInboundHandler上的fireChannelRegistered()方法的呼叫
     */
    @Override
    ChannelHandlerContext fireChannelRegistered();

    /**
     * 觸發對下一個ChannelInboundHandler上的fireChannelUnregistered()方法的呼叫
     */
    @Override
    ChannelHandlerContext fireChannelUnregistered();

    /**
     * 觸發對下一個ChannelInboundHandler上的fireChannelActive()方法的呼叫
     */
    @Override
    ChannelHandlerContext fireChannelActive();

    /**
     * 觸發對下一個ChannelInboundHandler上的fireChannelInactive()方法的呼叫
     */
    @Override
    ChannelHandlerContext fireChannelInactive();

    /**
     * 觸發對下一個ChannelInboundHandler上的fireExceptionCaught()方法的呼叫
     */
    @Override
    ChannelHandlerContext fireExceptionCaught(Throwable cause);

    /**
     * 觸發對下一個ChannelInboundHandler上的fireUserEventTriggered()方法的呼叫
     */
    @Override
    ChannelHandlerContext fireUserEventTriggered(Object evt);

    /**
     * 觸發對下一個ChannelInboundHandler上的fireChannelRead()方法的呼叫
     */
    @Override
    ChannelHandlerContext fireChannelRead(Object msg);

    /**
     * 觸發對下一個ChannelInboundHandler上的fireChannelReadComplete()方法的呼叫
     */
    @Override
    ChannelHandlerContext fireChannelReadComplete();

    /**
     * 觸發對下一個ChannelInboundHandler上的fireChannelWritabilityChanged()方法的呼叫
     */
    @Override
    ChannelHandlerContext fireChannelWritabilityChanged();

    /**
     * 將資料從Channel讀取到第一個入站緩衝區;如果讀取成功則觸發一個channelRead事件,並(在最後一個訊息被讀取完成後)通知ChannelInboundHandler的channelReadComplete(ChannelHandlerContext)方法
     */
    @Override
    ChannelHandlerContext read();

    /**
     * 重新整理所有掛起的訊息。
     */
    @Override
    ChannelHandlerContext flush();

    /**
     * 返回這個例項所關聯的ChannelPipeline
     */
    ChannelPipeline pipeline();

    /**
     * 返回和這個例項相關聯的Channel所配置的ByteBufAllocator
     */
    ByteBufAllocator alloc();

    /******************補充*************************/
    //write  通過這個例項寫入訊息並經過ChannelPipeline
    //writeAndFlush  通過這個例項寫入並沖刷訊息並經過ChannelPipeline

}

使用ChannelHandlerContext的API的時候,請牢記以下兩點:

  • ChannelHandlerContext和ChannelHandler之間的關聯(繫結)是永遠不會改變的,所以快取對它的引用是安全的;
  • 相對於其他類的同名方法,ChannelHandlerContext的方法將產生更短的事件流,應該儘可能地利用這個特性來獲得最大的效能。

 使用 ChannelHandlerContext

修改客戶端的ChildChannelHandler:

private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("客戶端啟動……");
            ByteBuf bufs= Unpooled.copiedBuffer("pipeline傳送的資料->", Charset.forName("UTF-8"));
            ch.pipeline().write(bufs);//通過呼叫ChannelPipeline的write方法將資料寫入通道,但是不重新整理
            ch.pipeline().addLast("text",new ChannelInboundHandlerAdapter() {
                @Override
                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    ctx.channel().write(Unpooled.copiedBuffer("通過ChannelHandlerContext獲取的channel傳送的訊息->",
                            Charset.forName("UTF-8")));//通過ChannelHandlerContext獲取的channel傳送的訊息->
                    CompositeByteBuf messageBuf=Unpooled.compositeBuffer();
                    ByteBuf headerBuf=buf;
                    ByteBuf bodyBuf=buf;
                    messageBuf.addComponent(bodyBuf);//將ByteBuf例項追加到CompositeByteBuf
                    messageBuf.addComponent(headerBuf);
                    for (ByteBuf buf:messageBuf){//遍歷所有ByteBuf
                        System.out.println(buf);
                        byte[] req = new byte[buf.readableBytes()];
                        buf.readBytes(req);
                        String body = new String(req, "UTF-8");
                        System.out.println("複合緩衝區:"+body);
                    }
                    ctx.writeAndFlush(buf);
                }

                public void channelRead(ChannelHandlerContext ctx, Object msg)
                        throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    ByteBuf copyBuf=((ByteBuf) msg).copy();
//                    System.out.println(buf.refCnt());//返回此物件的引用計數。如果為0,則表示此物件已被釋放。
//                    buf.release();//釋放引用計數物件
                    for (int i = 0; i < buf.capacity(); i++) {
                        byte b=buf.getByte(i);
                        if((char)b>='a'&&(char)b<='z'||(char)b>='A'&&(char)b<='Z'||(char)b==',')
                        System.out.println("i="+(char)b);
                    }
                    int i=buf.forEachByte(new ByteProcessor() {
                        @Override
                        public boolean process(byte value) throws Exception {
                            byte[] b=",".getBytes();
                            if (b[0]!=value)
                                return true;
                            else
                                return false;
                        }
                    });
                    System.out.println("i="+i+" value="+(char) buf.getByte(i));
                    ByteBuf sliced = buf.slice(0,2);
                    sliced.setByte(0,(byte)'h');
                    byte[] req = new byte[buf.readableBytes()];
                    buf.readBytes(req);
                    String body = new String(req, "UTF-8");
                    System.out.println(body);
                    ctx.fireChannelRead(copyBuf);
                }
            });
            ch.pipeline().addLast("text2",new ChannelInboundHandlerAdapter(){
                public void channelRead(ChannelHandlerContext ctx, Object msg)
                        throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    byte[] req = new byte[buf.readableBytes()];
                    buf.readBytes(req);
                    String body = new String(req, "UTF-8");
                    System.out.println("text2:"+body);
                    ByteBuf bufs= Unpooled.copiedBuffer("test2傳送的資料", Charset.forName("UTF-8"));
                    ctx.writeAndFlush(bufs);
                    ctx.close();
                }
            });
//            ch.pipeline().remove("text2");
        }
    }

     被呼叫的Channel或ChannelPipeline上的write()方法將一直傳播事件通過整個ChannelPipeline,但是在ChannelHandler的級別上,事件從一個ChannelHandler到下一個ChannelHandler的移動是由ChannelHandlerContext上的呼叫完成的。

    要想呼叫從某個特定的ChannelHandler開始的處理過程,必須獲取到在(ChannelPipeline)該ChannelHandler之前的ChannelHandler所關聯的ChannelHandlerContext。這個ChannelHandlerContext將呼叫和它所關聯的ChannelHandler之後的ChannelHandler。

訊息將從下一個ChannelHandler開始流經ChannelPipeline,繞過了所有前面的ChannelHandler。

    因為一個ChannelHandler可以從  屬於多個ChannelPipeline,所以它也可以繫結到多個ChannelHandlerContext例項。  對於這種用法   指在多個ChannelPipeline中共享同一個ChannelHandler,對應的ChannelHandler必須要使用@Sharable註解標註;否則,試圖將它新增到多個ChannelPipeline時將會觸發異常。顯而易見,為了安全地被用於多個併發的Channel(即連線),這樣的ChannelHandler必須是執行緒安全的。 

只應該在確定了你的ChannelHandler是執行緒安全的時才使用@Sharable註解。

在多個ChannelPipeline中安裝同一個ChannelHandler的一個常見的原因是用於收集跨越多個Channel的統計資訊。

異常處理

處理入站異常

    如果在處理入站事件的過程中有異常被丟擲,那麼它將從它在ChannelInboundHandler裡被觸發的那一點開始流經ChannelPipeline。要想處理這種型別的入站異常,你需要在你的ChannelInboundHandler實現中重寫下面的方法。

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        System.out.println("伺服器異常..");
        cause.printStackTrace();
        ctx.close();
    }

    因為異常將會繼續按照入站方向流動(就像所有的入站事件一樣),所以實現了前面所示邏輯的ChannelInboundHandler通常位於ChannelPipeline的最後。這確保了所有的入站異常都總是會被處理,無論它們可能會發生在ChannelPipeline中的什麼位置。

  • ChannelHandler.exceptionCaught()的預設實現是簡單地將當前異常轉發給ChannelPipeline中的下一個ChannelHandler;
  • 如果異常到達了ChannelPipeline的尾端,它將會被記錄為未被處理;
  • 要想定義自定義的處理邏輯,你需要重寫exceptionCaught()方法。然後你需要決定是否需要將該異常傳播出去。

處理出站異常 

用於處理出站操作中的正常完成以及異常的選項,都基於以下的通知機制。

  • 每個出站操作都將返回一個ChannelFuture。註冊到ChannelFuture的ChannelFutureListener將在操作完成時被通知該操作是成功了還是出錯了  。
  • 幾乎所有的ChannelOutboundHandler上的方法都會傳入一個ChannelPromise的例項。作為ChannelFuture的子類,ChannelPromise也可以被分配用於非同步通知的監聽器。但是,ChannelPromise還具有提供立即通知的可寫方法:

   ChannelPromise setSuccess();
   ChannelPromise setFailure(Throwable cause);

新增ChannelFutureListener只需要呼叫ChannelFuture例項上的addListener(ChannelFutureListener)方法,並且有兩種不同的方式可以做到這一點。其中最常用的方式是,調用出站操作(如write()方法)所返回的ChannelFuture上的addListener()方法。

方式一:

                    ChannelFuture future=ctx.writeAndFlush(bufs);
                    future.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isSuccess())
                                System.out.println("成功");
                            else{
                                System.out.println("失敗");
                                future.cause().printStackTrace();
                                future.channel().close();
                            }
                        }
                    });

 方式二:ChannelFutureListener新增到即將作為引數傳遞給ChannelOutboundHandler的方法的ChannelPromise。

public class OutboundExceptionHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void
    write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        promise.addListener(new ChannelFutureListener() {
            @Override
            public void
            operationComplete(ChannelFuture
                                      f) {
                if (!f.isSuccess()) {
                    f.cause().printStackTrace();
                    f.channel().close();
                }
            }
        });
    }
} 

ChannelPromise的可寫方法

    通過呼叫ChannelPromise上的setSuccess()和setFailure()方法,可以使一個操作的狀態在ChannelHandler的方法返回給其呼叫者時便即刻被感知到。

參考《Netty實戰》

附:

package netty.in.action;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.nio.charset.Charset;

public class EchoServer {

    final ByteBuf bufs= Unpooled.copiedBuffer("Hello,劉德華", Charset.forName("UTF-8"));
    public void bind(int port) throws Exception {
        //配置服務端的NIO執行緒組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .childHandler(new ChildChannelHandler());

            // 繫結埠,同步等待成功
            ChannelFuture f=b.bind(port).sync();

            //等待服務端監聽埠關閉
            f.channel().closeFuture().sync();
        } finally {
            //退出,釋放執行緒池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("服務端啟動……");
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg)
                        throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    if(buf.hasArray()){
                        byte[] array=buf.array();//返回該緩衝區的備份位元組陣列。
                        int offset=buf.arrayOffset()+buf.readerIndex();//計算第一個位元組的偏移量
                        int length=buf.readableBytes();//獲取可讀位元組數
                        String s=new String(array,offset,length);
                        System.out.println("s="+s);
                    }else{
                        byte[] array = new byte[buf.readableBytes()];//獲取可讀位元組數並分配一個新的陣列來儲存
                        buf.getBytes(buf.readerIndex(),array);//將位元組複製到該陣列
                        String s=new String(array,0,buf.readableBytes());
                        System.out.println("直接緩衝區:"+s);
                    }
                    byte[] req = new byte[buf.readableBytes()];
                    buf.readBytes(req);
                    String body = new String(req, "UTF-8");
                    System.out.println(body);
                    bufs.retain();//引用計數器加一
                    ChannelFuture future=ctx.writeAndFlush(bufs);
                    future.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isSuccess())
                                System.out.println("成功");
                            else{
                                System.out.println("失敗");
                                future.cause().printStackTrace();
                                future.channel().close();
                            }
                        }
                    });
//                    ctx.close();
                }
            });
        }
    }

    public static void main(String[] args) throws Exception {
        int port=8080;
        new EchoServer().bind(port);
    }
}
package netty.in.action;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ByteProcessor;

import java.nio.charset.Charset;

public class EchoClient {
    final ByteBuf buf= Unpooled.copiedBuffer("Hello,王寶強", Charset.forName("UTF-8"));
    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO執行緒組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChildChannelHandler() );
            // 發起非同步連線操作
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放NIO執行緒組
            group.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("客戶端啟動……");
            ByteBuf bufs= Unpooled.copiedBuffer("pipeline傳送的資料->", Charset.forName("UTF-8"));
            ch.pipeline().write(bufs);//通過呼叫ChannelPipeline的write方法將資料寫入通道,但是不重新整理
            ch.pipeline().addLast("text",new ChannelInboundHandlerAdapter() {
                @Override
                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    ctx.channel().write(Unpooled.copiedBuffer("通過ChannelHandlerContext獲取的channel傳送的訊息->",
                            Charset.forName("UTF-8")));//通過ChannelHandlerContext獲取的channel傳送的訊息->
                    CompositeByteBuf messageBuf=Unpooled.compositeBuffer();
                    ByteBuf headerBuf=buf;
                    ByteBuf bodyBuf=buf;
                    messageBuf.addComponent(bodyBuf);//將ByteBuf例項追加到CompositeByteBuf
                    messageBuf.addComponent(headerBuf);
                    for (ByteBuf buf:messageBuf){//遍歷所有ByteBuf
                        System.out.println(buf);
                        byte[] req = new byte[buf.readableBytes()];
                        buf.readBytes(req);
                        String body = new String(req, "UTF-8");
                        System.out.println("複合緩衝區:"+body);
                    }
                    ctx.writeAndFlush(buf);
                }

                public void channelRead(ChannelHandlerContext ctx, Object msg)
                        throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    ByteBuf copyBuf=((ByteBuf) msg).copy();
//                    System.out.println(buf.refCnt());//返回此物件的引用計數。如果為0,則表示此物件已被釋放。
//                    buf.release();//釋放引用計數物件
                    for (int i = 0; i < buf.capacity(); i++) {
                        byte b=buf.getByte(i);
                        if((char)b>='a'&&(char)b<='z'||(char)b>='A'&&(char)b<='Z'||(char)b==',')
                        System.out.println("i="+(char)b);
                    }
                    int i=buf.forEachByte(new ByteProcessor() {
                        @Override
                        public boolean process(byte value) throws Exception {
                            byte[] b=",".getBytes();
                            if (b[0]!=value)
                                return true;
                            else
                                return false;
                        }
                    });
                    System.out.println("i="+i+" value="+(char) buf.getByte(i));
                    ByteBuf sliced = buf.slice(0,2);
                    sliced.setByte(0,(byte)'h');
                    byte[] req = new byte[buf.readableBytes()];
                    buf.readBytes(req);
                    String body = new String(req, "UTF-8");
                    System.out.println(body);
                    ctx.fireChannelRead(copyBuf);
                }
            });
            ch.pipeline().addLast("text2",new ChannelInboundHandlerAdapter(){
                public void channelRead(ChannelHandlerContext ctx, Object msg)
                        throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    byte[] req = new byte[buf.readableBytes()];
                    buf.readBytes(req);
                    String body = new String(req, "UTF-8");
                    System.out.println("text2:"+body);
                    ByteBuf bufs= Unpooled.copiedBuffer("test2傳送的資料", Charset.forName("UTF-8"));
                    ctx.writeAndFlush(bufs);
                    ctx.close();
                }
            });
//            ch.pipeline().remove("text2");
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoClient().connect(port, "127.0.0.1");
    }
}