1. 程式人生 > >Pipeline inbound(netty原始碼7)

Pipeline inbound(netty原始碼7)

netty原始碼死磕7 

Pipeline 入站流程詳解

1. Pipeline的入站流程

在講解入站處理流程前,先腦補和鋪墊一下兩個知識點:

(1)如何向Pipeline新增一個Handler節點

(2)Handler的出站和入站的區分方式

1.1. HandlerContext節點的新增

在Pipeline例項建立的同時,Netty為Pipeline建立了一個Head和一個Tail,並且建立好了連結關係。

程式碼如下:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);


    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

也就是說,在加入業務Handler之前,Pipeline的內部雙向連結串列不是一個空連結串列。而新加入的Handler,加入的位置是,插入在連結串列的倒數第二個位置,在Tail的前面。

wpsF713.tmp

加入Handler的程式碼,在DefaultChannelPipeline類中。

具體的程式碼如下:

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {

     //檢查重複
        checkMultiplicity(handler);
        //建立上下文
        newCtx = newContext(group, filterName(name, handler), handler);
         //加入雙向連結串列
        addLast0(newCtx);
        //…
    }
    callHandlerAdded0(newCtx);
    return this;
}

加入之前,首先進行Handler的重複性檢查。非共享型別的Handler,只能被新增一次。如果當前要新增的Handler是非共享的,並且已經新增過,那就丟擲異常,否則,標識該handler已經新增。

什麼是共享型別,什麼是非共享型別呢?先聚焦一下主題,後面會詳細解答。

檢查完成後,給Handler建立包裹上下文Context,然後將Context加入到雙向列表的尾部Tail前面。

程式碼如下:

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

這裡主要是通過調整雙向連結的指標,完成節點的插入。如果對雙向連結串列不熟悉,可以自己畫畫指向變化的草圖,就明白了。

1.2. Context的出站和入站的型別

對於入站和出站,Pipeline中兩種不同型別的Handler處理器,出站Handler和入站Handler。

入站(inBound)事件Handler的基類是 ChannelInboundHandler,出站(outBound)事件Handler的基類是 ChannelOutboundHandler。

處理入站(inBound)事件,最典型的就是處理Channel讀就緒事件,還有就是業務處理Handler。處理出站outBound操作,最為典型的處理,是寫資料到Channel。

對應於兩種Handler處理器的Context 包裹器,更加需要區分入站和出站。對Context的區分方式,又是什麼呢?

首先,需要在Context加了一組boolean型別判斷屬性,判斷出站和入站的型別。這組屬性就是——inbound、outbound。這組屬性,定義在上下文包裹器的基類中——ContextAbstractChannelHandlerContext 定義。它們在建構函式中進行初始化。

ContextAbstractChannelHandlerContext 的構造器程式碼如下:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

{

private final boolean inbound;
private final boolean outbound;

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
   //….
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
   //…
}

//…

}
對於通用的預設包裹器,繼承了ContextAbstractChannelHandlerContext 基類,並且在自己的構造器中,初始化這兩個父類屬性的方法,如下:

final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

 //…
private final ChannelHandler handler;
 private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
 }

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
 super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
   //….

this.handler = handler;

}

}
從上面的程式碼可以看出, 通用的包裹器DefaultChannelHandlerContext ,通過自己的isInbound()、isOutbound()方法的返回值,對建構函式引數中的Handler 型別進行判斷,來設定分類的boolean型別屬性inbound、outbound的值。

再看兩個非通用的HandlerContext——head和tail。

在HeadContext,則呼叫父類構造器的第五個引數(outbound)的值為true,表示Head是一個出站型別的Context。程式碼如下:

final class HeadContext extends AbstractChannelHandlerContext
        implements ChannelOutboundHandler, ChannelInboundHandler {
    private final Unsafe unsafe;
    HeadContext(DefaultChannelPipeline pipeline) {

//父類構造器
        super(pipeline, null, HEAD_NAME, false, true);
//...

}

}

在TailContext,則呼叫父類構造器的第四個引數(inbound)的值為true,表示Tail是一個入站型別的Context。程式碼如下:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
    TailContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, TAIL_NAME, true, false);
//...

   }

}

無論是哪種型別的handler,Pipeline沒有單獨和分開的入站和出站連結串列,都是統一在一個雙向連結串列中進行管理。

下圖中,使用紫色代表入站Context,橙色代表出站Context。

wpsF724.tmp

在上圖中,橙色表示出站Context,紫色表示入站Context。

在上圖中的流程中,區分一個 ChannelHandlerContext到底是in(入站)還是out(出站) ,使用的是Context的isInbound() 和 isOutbound() 這一組方法。

贅述一下:

Tail是出站執行流程的啟動點,但是,它最後一個入站處理器。

Hearder,是入站流程的啟動起點,但是,它最後一個出站處理器。

感覺,有點兒饒。容易讓人混淆。看完整個的入站流程和出站流程的詳細介紹,就清楚了。

1.3. 入站操作的全流程

入站事件前面已經講過,流向是從Java 底層IO到ChannelHandler。入站事件的型別包括連線建立和斷開、讀就緒、寫就緒等。

基本上,,在處理流程上,大部分的入站事件的處理過程,是一致的。

通用的入站Inbound事件處理過程,大致如下(使用IN_EVT符號代替一個通用事件):

(1)pipeline.fireIN_EVT

(2)AbstractChannelHandlerContext.invokeIN_EVT(head, msg);

(3)context.invokeIN_EVT(msg);

(4)handler.IN_EVT

(5)context.fireIN_EVT(msg);

(6)Connect.findContextInbound()

(7)context.invokeIN_EVT(msg);

上面的流程,如果短時間內看不懂,沒有關係。可以先看一個例子,再回來推敲學習這個通用流程。

1.4. 讀就緒事件的流程例項

下面以最為常見和最好理解的事件——讀就緒的事件為例,將Inbound事件做一個詳細的描述。

整個讀就緒的入站處理流程圖,如下:


inbount 1


1.5. 入站源頭的Java底層 NIO封裝

入站事件處理的源頭,在Channel的底層Java NIO 就緒事件。

Netty對底層Java NIO的操作類,進行了封裝,封裝成了Unsafe系列的類。比方說,AbstractNioByteChannel 中,就有一個NioByteUnsafe 類,封裝了底層的Java NIO的底層Byte位元組的讀取操作。

為什麼叫Unsafe呢?

很簡單,就是在外部使用,是不安全的。Unsafe就是隻能在Channel內部使用的,在Netty 外部的應用開發中,不建議使用。Unsafe包裝了底層的資料讀取工作,包裝在Channel中,不需要應用程式關心。應用程式只需要從快取中,取出快取資料,完成業務處理即可。

Channel 讀取資料到快取後,下一步就是呼叫Pipeline的fireChannelRead()方法,從這個點開始,正式開始了Handler的入站處理流程。

從Channel 到Pipeline這一段,Netty的程式碼如下:

public abstract class AbstractNioByteChannel extends AbstractNioChannel

{

 protected class NioByteUnsafe extends AbstractNioUnsafe {

     @Override
    public final void read() {
final ChannelPipeline pipeline = pipeline();
        ……
      // 讀取結果.

      byteBuf = allocHandle.allocate(allocator);
      ……
      int localReadAmount = doReadBytes(byteBuf);
      ………

  // 通過pipeline dispatch(分發)結果到Handler

  pipeline.fireChannelRead(byteBuf);
      ……
   }

//通過重寫newUnsafe() 方法

//取得內部類NioSocketChannelUnsafe的例項
@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioSocketChannelUnsafe();
}

…

}
channel呼叫了 pipeline.fireChannelRead(byteBuf)後,進入pipeline 開始處理。這是流程的真正啟動的動作。

1.6. Head是入站流程的起點

前面分析到,Pipeline中,入站事件處理流程的處理到的第一個Context是Head。

這一點,從DefaultChannelPipeline 原始碼可以得到驗證,如下所示:

public class DefaultChannelPipeline implements ChannelPipeline

{

…

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

…

}

Pipeline將內部連結串列的head頭作為引數,傳入了invokeChannelRead的靜態方法中。

就像開動了流水線的開關,開啟了整個的流水線的迴圈處理。


1.7. 小迭代的五個動作

一個Pipeline上有多個InBound Handler,每一個InBound Handler的處理,可以算做一次迭代,也可以說成小迭代。

每一個迭代,有四個動作。這個invokeIN_EVT方法,是整個四個動作的小迭代的起點。

四個動作,分別如下:

(1)invokeChannelRead(next, msg)

(2)context.invokeIN_EVT(msg);

(3)handler.IN_EVT

(4)context.fireIN_EVT(msg);

(5)Connect.findContextInbound()

區域性的流程圖如下:

inbount 2

整個五個動作中,只有第三步在Handler中,其他的四步都在Context中完成。

1.8. 流水線小迭代的第一步

invokeChannelRead(next,msg) 靜態方法,非常關鍵,其重要作為是:作為流水線迭代處理的每一輪迴圈小迭代的第一步。在Context的抽象基類中,原始碼如下:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

{
//...
static void invokeChannelRead(final AbstractChannelHandlerContext next, final Object msg) {
  ……
next.invokeChannelRead(msg);
  ……
}

//...
}
首先,這個是一個靜態方法。

其次,這個方法沒有啥特別。只是做了一個二轉。將處理傳遞給context例項,呼叫context例項的invokeChannelRead方法。強調一下,使用了同一個名稱哈。但是後邊的invokeChannelRead,是一個例項方法,而且只有一個引數。

1.9. context.invokeIN_EVT例項方法

流水線小迭代第二步,觸發當前的Context例項的IN_EVT操作。

對於IN_EVT為ChannelRead的時候,第二步方法為invokeChannelRead,其原始碼如下:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

{

private void invokeChannelRead(Object msg) {
    ……
       ((ChannelInboundHandler) handler()).channelRead(this, msg);
……
}

}

這一步很簡單,就是將context和msg(byteBuf)作為引數,傳遞給Handler例項,完成業務處理。

在Handler中,可以獲取到以上兩個引數例項,作為業務處理的輸入。在業務Handler中的IN_EVT方法中,可以寫自己的業務處理邏輯。

1.10. 預設的handler.IN_EVT 入站處理操作

流水線小迭代第三步,完後Context例項中Handler的IN_EVT業務操作。

如果Handler中的IN_EVT方法中沒有寫業務邏輯,則Netty提供了預設的實現。預設原始碼在ChannelInboundHandlerAdapter 介面卡類中。

當IN_EVT為ChannelRead的時候,第三步的預設實現原始碼如下:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler

{

//預設的通道讀操作
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

//...

}

讀完原始碼發現,這份預設原始碼,都沒有做什麼實際的處理。

唯一的乾的活,就是呼叫ctx.fireChannelRead(msg),將msg通過context再一次發射出去。

進入第四步。

1.11. context.fireIN_EVT再發射訊息

流水線小迭代第四步,尋找下家,觸發下一家的入站處理。

整個是流水線的流動的關鍵一步,實現了向下一個HandlerContext的流動。

原始碼如下:

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext

{

private final boolean inbound;
private final boolean outbound;

//...

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

//..

}

第四步還是在ChannelInboundHandlerAdapter 介面卡中定義。首先通過第五步,找到下一個Context,然後回到小迭代的第一步,完成了小迭代的一個閉環。

這一步,對於業務Handler而言,很重要。

在使用者Handler中,如果當前 Handler 需要將此事件繼續傳播下去,則呼叫contxt.fireIN_EVT方法。如果不這樣做, 那麼此事件的流水線傳播會提前終止。

1.12. findContextInbound()找下家

第五步是查詢下家。

程式碼如下:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler

{

//...

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

}

這個是一個標準的連結串列查詢操作。this表示當前的context,this.next表示下一個context。通過while迴圈,一直往流水線的下邊找,知道查詢到下一個入站Context為止。

假定流水下如下圖所示:

wpsF775.tmp

在上圖中,如果當前context是head,則下一個是Decoder;如果當前context是Decoder,則下一個是Business;如果當前context是Business,則下一個是Tail。

第五步,是在第四步呼叫的。

找到之後,第四步通過 invokeChannelRead(findContextInbound(), msg)這個靜態方法的呼叫,由回到小迭代的第一步,開始下一輪小的執行。

1.13. 最後一輪Context處理

我們在前面講到,在Netty中,Tail是最後一個IN boundContext。

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    onUnhandledInboundMessage(msg);
}

protected void onUnhandledInboundMessage(Object msg) {
  //…

        //釋放msg的引用計數
        ReferenceCountUtil.release(msg);
  //..
}

}

在最後的一輪入站處理中。Tail沒有做任何的業務邏輯,僅僅是對msg 釋放一次引用計數。

這個msg ,是從channel 入站源頭的過來的byteBuf。有可能是引用計數型別(ReferenceCounted)型別的快取,則需要釋放其引用。如果不是ReferenceCounted,則什麼也不做。

關於快取的引用計數,後續再開文章做專題介紹。

1.14. 小結

對入站(Inbound )事件的處理流程,做一下小節:

Inbound 事件是通知事件,當某件事情已經就緒後,從Java IO 通知上層Netty  Channel。

Inbound 事件源頭是 Channel內部的UNSafe;

Inbound 事件啟動者是 Channel,通過Pipeline. fireIN_EVT啟動。

Inbound 事件在 Pipeline 中傳輸方向是從 head 到 tail。

Inbound 事件最後一個的處理者是 TailContext, 並且其處理方法是空實現。如果沒有其他的處理者,則對Inbound ,TailContext是唯一的處理者。

Inbound 事件的向後傳遞方法是contxt.fireIN_EVT方法。在使用者Handler中,如果當前 Handler 需要將此事件繼續傳播下去,則呼叫contxt.fireIN_EVT方法。如果不這樣做, 那麼此事件的流水線傳播會提前終止。