1. 程式人生 > >Netty入門(3) - ChannelHandler

Netty入門(3) - ChannelHandler

ChannelPipeline

ChannelHandler例項的列表,用於處理或者截獲通道的接收和傳送資料,讓使用者可以在ChannelPipeline中完全控制一個事件以及處理ChannelHandler和ChannelPipeline的互動。

每一個新的通道,都會建立一個新的ChannelPipeline並且附加到通道,永久性的耦合。

一個入站I/O事件,這個事件會從ChannelPipeline的第一個Handler開始一次通過,出站IO事件則從最後一個Handler開始依次通過。Handler可以處理時間並檢查型別,不能處理則跳過,並將事件傳遞給下一個Handler。

我們找到,不能有其他IO-Thread的阻塞拉力影響整體的IO處理,比如JDBC。這時候可以通過一個EventExecutorGroup,自定義的事件會被包含在EventExecutorGroup的EventExecutor處理。

ChannelHandlerContext

通過context,ChannelHandler允許與其他ChannelHandler實現互動:

1、通知下一個ChannelHandler

2、想事件流全部通過ChannelPipeline,可以通過呼叫Channel和ChannelPipeline的方法:

 

3、想事件從ChannelPipeline的指定位置開始:

ChannelHandlerContext是執行緒安全的,可以在外部使用。

修改ChannelPipeline

 呼叫ChannelHandlerContext的pipeline()方法能訪問ChannelPipeline,可以執行時動態調整ChannelHandler。可以保持ChannelHandlerContext供以後使用,執行緒安全:

public class WriteHandler extends ChannelHandlerAdapter {
    
    private ChannelHandlerContext ctx;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
    }
    
    public void send(String msg) {
        ctx.write(msg);
    }

}

如果ChannelHandler例項帶有@Sharable註解則可以被新增到多個ChannelPipeline中,也就是說單個ChannelHandler可以有多個ChannelHandlerContext;如果沒有新增該註解的Handler例項新增到多個Pipeline中則會拋異常:

@Sharable
public class NotSharableHandler extends ChannelInboundHandlerAdapter {

    private int count;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        count++;
        System.out.println("channelRead(...) called the " + count + " time");
        ctx.fireChannelRead(msg);
    }
}

ChannelHandler及其子類

ChannelHandlerAdapter

ChannelInboundHandler / ChannelInboundHandlerAdapter 處理完訊息之後不會自動釋放,需要ReferenceCountUtil.release(msg);

ChannelOutboundHandler / ChannelOutboundHandlerAdapter

SimpleChannelInboundHandler<T> / SimpleChannelInboundHandler<T> 處理完訊息之後自動釋放

 

ChannelOutboundHandler所有重要方法採用ChannelPromise,如果ChannelPromise沒有被通知,可能會導致其中一個ChannelFutureListener沒有被通知去處理一個訊息:

public class DiscardOutboundHandler extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ReferenceCountUtil.release(msg);
        promise.setSuccess();
    }
    
}

 

總結一下:一般自定義訊息,使用編碼解碼器實現位元組傳輸,使用ChannelInboundHandlerAdapter/ChannelOutboundHandlerAdapter處理事件或者狀態改變,使用SimpleChannelInboundHandler/SimpleChannelOutboundHandler處理訊息。