1. 程式人生 > >netty(五) ChannelPipeLine和ChannelHandler學習

netty(五) ChannelPipeLine和ChannelHandler學習

ChannelPipeline和ChannelHandler學習

根據java NIO思想,不管是從檔案中讀寫資料還是從網路中讀寫資料,都是藉助於一個Channel來進行的。 即:File <–> Channel <–> 程式 在netty框架中,netty將Channel的資料管道(Channel中包含很多資訊)抽象成一個ChannelPipeline,用於描述資料在管道中的流入流出。類比Java Web中的Filter和攔截器,可以在訊息最終到達目的地之前可以對訊息進行一個過濾和處理,那麼netty框架中的訊息在ChannelPipeline中流動和傳遞的時候,也是可以加一些“攔截器”對訊息進行攔截並進行加工,在netty框架中起攔截訊息作用的就是ChannelHandler。

1 ChannelPipeLine

ChannelPipeline是對資料管道的一個抽象,資料要想從網路達到程式或者從程式達到網路,必須經過ChannelPipeline才可以。 但是在前面的例子中並沒有直接繫結ChannelPipeline,從前面的描述當中可以猜測ChannelPipeline和Channel有關,因此可以猜想當繫結Channel的時候,可能也綁定了ChannelPipeline,即,當建立NioServerSocketChannel物件的時候會隨帶著建立一個ChannelPipeline物件,那麼可以先看看當建立NioServerSocketChannel的時候都幹了什麼事情。會發現當建立NioServerSocketChannel物件的時候,會首先執行父類構造器,那麼一直找,會發現會先執行NioServerSocketChannel的父類AbstractChannel的構造器。

//例項化NioServerSocketChannel
public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) {
    super((Channel)null, channel, 16);
    this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket());
}

//父類
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int
readInterestOp) { super(parent, ch, readInterestOp); } //父類 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); ... } //父類 protected AbstractChannel(Channel parent) { this.parent = parent; this.unsafe = this.newUnsafe(); this.pipeline = this.newChannelPipeline(); }

從這裡可以發現,當建立一個NioServerSocketChannel物件的時候,會首先建立一個ChannelPipeline物件(DefaultChannelPipeline是ChannelPipeline的子類):

//建立一個雙向連結串列
protected DefaultChannelPipeline(Channel channel) {
    this.channel = (Channel)ObjectUtil.checkNotNull(channel, "channel");
    this.tail = new DefaultChannelPipeline.TailContext(this);
    this.head = new DefaultChannelPipeline.HeadContext(this);
    this.head.next = this.tail;
    this.tail.prev = this.head;
}

這個雙向連結串列上的節點其實就是一個個的ChannelHandler。

因此,可以這麼說,當客戶端向服務端傳送一個連線請求,或者客戶端與服務端建立一個網路連線進行IO操作的時候,會首先建立一個Channel,前者建立的時NioServerSocketChannel,後者會建立一個NioSocketChannel,不管是何種Channel,在建立Channel物件的時候,會首先建立一個ChannelPipeline物件用於描述資料傳輸通道,並將其作為Channel的一個屬性;在建立ChannelPipeline物件的時候,其內部又建立了一個雙向連結串列,用於儲存多個ChannelHandler,在資料從CHannelPipeline中流動的時候,對其進行攔截並進行加工或者處理。 它們之間的關係如圖所示: 這裡寫圖片描述

2 inbound事件和outbound事件

在netty框架中,事件整體可以分為兩大類inbound事件和outbound事件,inbound事件對應的就是鏈路的建立,讀操作,鏈路關閉,異常通知等事件;outbound事件對應的是發起IO操作,寫操作,繫結,訊息傳送等事件,雖然只有這兩類,但是有時候會記不住,這個時候可以類比Java IO中的輸入輸出流,inbound事件基本上都是從通道中拿資料,相當於in;outbound基本上都是往通道里傳送資料,因此相當於out。

在netty框架中,當inbound事件發生時,會觸發以下方法:

ChannelInboundHandler.fireChannelRegistered()    //Channel註冊的時候會被呼叫
ChannelInboundHandler.fireChannelActive()      //TCP鏈路建立成功時,會被呼叫
ChannelInboundHandler.fireChannelRead(Object)   //讀事件發生時呼叫
ChannelInboundHandler.fireChannelReadComplete()   //讀事件完成的時候呼叫
ChannelInboundHandler.fireException()     //發生異常的時候會被呼叫
ChannelInboundHandler.fireUserEventTriggered(Object)  //使用者自定義事件發生的時候被呼叫
ChannelInboundHandler.fireChannelInactive()   //TCP鏈路關閉的時候會被呼叫
ChannelInboundHandler.fireChannelWritabilityChanged()  //寫狀態變化事件時被呼叫

以上這些方法 在netty框架中,當outbound事件發生時,會觸發以下方法:

ChannelOutboundHandler.bind()   //bind事件
ChannelOutboundHandler.connect    //連線事件
ChannelOutboundHandler.write()   //寫操作事件
ChannelOutboundHandler.read()    //讀事件
ChannelOutboundHandler.flush()    //重新整理事件
ChannelOutboundHandler.disconnect()   //斷開連線事件
ChannelOutboundHandler.writeAndFlush   
ChannelOutboundHandler.deregister()  //登出
ChannelOutboundHandler.close()    //關閉事件

有點疑惑:就是read事件的時候為什麼outbound事件所屬方法被呼叫?沒有理解

ChannelHandler相當於一個攔截器,對資料進行加工和處理,但是如果沒有觸發事件的話,這些ChannelHandler是不會被執行的,我想這就是學習inbound事件和outbound事件的原因吧。

3 ChannelHandler

ChannelHandler相當與Java Web應用的攔截器,在請求到來前後和響應回去前後,進行連線做一些我們自己想要它做的操作,通常情況一個ChannelHandler僅僅會處理特定的事件,對於其他事件它是不處理的,會交給下一個ChannelHandler處理,比如:在Channel上發生讀操作時,想及時獲取客戶端傳過來的訊息,並做必要的處理等等。

由於netty中事件分為inbound事件和outbound事件,而handler又是因事件而出發的,因此handler是分類的:能夠被inbound事件觸發的handler,能夠被outbound事件觸發的handler,能夠被inbound和outbound事件觸發的handler

就像netty(一)中的例子那樣,如果關注與inbound事件的話,那麼就可以整合ChannelInboundHandlerAdapter類,並重寫對應的方法,比如當channel註冊事件發生時,想進行一些處理,那麼就重寫channelRegistered()方法即可,在Channel註冊的時候會自動呼叫該方法,那麼重寫的業務邏輯就會被執行。

當需要關注於多個事件的時候,為了保證ChannelHandler的專注性,可以建立多個ChannelHandler,然後新增到Pipeline中的那個雙向連結串列中,當資料從通道流動的時候,依據事件會因此觸發這個雙向連結串列中的各個handler。

4 ChannelHandler的觸發流程

4.1 handler處理outbound事件流程

bind()事件,當該Channel上bind事件發生之後,因為bind事件時outbout事件,會找到該Channel中的那個handler構成的雙向連結串列,從尾節點開始,一次查詢每一個Handler,如果發現這個handler能夠處理outbound事件,那麼,呼叫基於這個handler物件呼叫bind()方法,如果這個handler使我們自己新建的handler,並且重寫了其中的bind()方法,那麼根據多型就會執行我們自己重寫的業務邏輯,因此就會按照我們自己的意志來完成一些操作。 來看看程式碼:

//1
serverBootstrap.bind();

//2
serverBootstrap.doBdind();

//3
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
    //這裡會呼叫一個NioEventLoop執行緒去執行bind操作
    channel.eventLoop().execute(new Runnable() {
        public void run() {
            if (regFuture.isSuccess()) {
                //呼叫的時Channel的bind()方法
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }

        }
    });
}

...

//呼叫Channel所屬的pipeline的bind()方法
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return this.pipeline.bind(localAddress, promise);
}

//從這裡可以看出來,會從pipeline中的雙向連結串列中找,
//入口是tail即尾節點
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return this.tail.bind(localAddress, promise);
}

//上面也已經解釋過了
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    } else if (this.isNotValidPromise(promise, false)) {
        return promise;
    } else {
        //找到能夠處理outbound事件的handler
        final AbstractChannelHandlerContext next = this.findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeBind(localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                public void run() {
                    next.invokeBind(localAddress, promise);
                }
            }, promise, (Object)null);
        }

        return promise;
    }
}

//執行handler的invokeBind
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (this.invokeHandler()) {
        try {
            //這裡就會執行hendler的bind()方法,如果重寫了的話,就會執行我們重寫的bind
            ((ChannelOutboundHandler)this.handler()).bind(this, localAddress, promise);
        } catch (Throwable var4) {
            notifyOutboundHandlerException(var4, promise);
        }
    } else {
        this.bind(localAddress, promise);
    }

}

4.2 handler處理inbound事件流程

以鏈路建立為例,前面已經說過了,在Server端建立了兩個執行緒池NioEventLoop,其中AbstractBootstrap.parentGroup用於監聽埠,在啟動的時候會建立一個ServerNioSocketChannel通道,一旦這個Channel通道建立的時候就會出發

serverBootstrap.bind();

//serverBootstrap.initAndRegister
final ChannelFuture initAndRegister() {

    ChannelFuture regFuture = this.group().register(channel);
}

//呼叫NioEventLoopGroup中的register
 public ChannelFuture register(Channel channel) {
    return this.next().register(channel);
}

//呼叫NioEventLoop的register
public ChannelFuture register(Channel channel, ChannelPromise promise) {
    if (channel == null) {
        throw new NullPointerException("channel");
    } else if (promise == null) {
        throw new NullPointerException("promise");
    } else {
        channel.unsafe().register(this, promise);
        return promise;
    }
}

//AbstractUnsafe.register -> AbstractUnsafe.register0
private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !this.ensureOpen(promise)) {
            return;
        }

        boolean firstRegistration = this.neverRegistered;
        AbstractChannel.this.doRegister();
        this.neverRegistered = false;
        AbstractChannel.this.registered = true;
        AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
        this.safeSetSuccess(promise);
        //Channel註冊事件觸發
        AbstractChannel.this.pipeline.fireChannelRegistered();
        if (AbstractChannel.this.isActive()) {
            if (firstRegistration) {
                //鏈路建立成功事件,觸發此方法
                AbstractChannel.this.pipeline.fireChannelActive();
            } else if (AbstractChannel.this.config().isAutoRead()) {
                this.beginRead();
            }
        }
    } catch (Throwable var3) {
        this.closeForcibly();
        AbstractChannel.this.closeFuture.setClosed();
        this.safeSetFailure(promise, var3);
    }

}

5 總結

SocketChannel包含pipeline,pipeline包含handler,它們的關係就是這樣,客戶端與服務端互動的時候實際上走的就是SocketChannel中的Pipeline,在資料流動的過程中會形成一個個的事件,並依次會觸發pipeline中的handler,handler會根據事件,觸發不同的方法。

因此,我們可以通過重寫handler,將我們定義的業務邏輯加到這裡面,在合適的時機被執行。

netty框架中的事件有inbounf事件和outbound事件,如果我們對inbound事件感興趣的話,那麼可以繼承ChannelInboundHandler,並重寫其中的特定動作能夠觸發的方法,比如我們對inbound事件中的讀事件感興趣,那麼就可以重寫channelRead()方法,從通道中讀取客戶端傳過來的資料。如果對outbound事件感興趣,可以繼承ChannelOutboundHandler,並重寫其中的特定動作能夠觸發的方法。

6 參考資料

《Netty權威指南》