1. 程式人生 > >Netty 解碼器抽象父類 ByteToMessageDecoder 源碼解析

Netty 解碼器抽象父類 ByteToMessageDecoder 源碼解析

cep 轉換 cached 運算 ont 直接 小細節 message TP

技術分享圖片

前言

Netty 的解碼器有很多種,比如基於長度的,基於分割符的,私有協議的。但是,總體的思路都是一致的。

拆包思路:當數據滿足了 解碼條件時,將其拆開。放到數組。然後發送到業務 handler 處理。

半包思路: 當讀取的數據不夠時,先存起來,直到滿足解碼條件後,放進數組。送到業務 handler 處理。

而實現這個邏輯的就是我們今天的主角:ByteToMessageDecoder。

看名字的意思是:將字節轉換成消息的解碼器。人如其名。而他本身也是一個入站 handler,所以,我們還是從他的 channelRead 方法入手。

1. channelRead 方法

精簡過的代碼如下:

public
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 從對象池中取出一個List CodecOutputList out = CodecOutputList.newInstance(); ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { // 第一次解碼 cumulation = data;// 累計 } else { // 第二次解碼,就將 data 向 cumulation 追加,並釋放 data
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } // 得到追加後的 cumulation 後,調用 decode 方法進行解碼 // 解碼過程中,調用 fireChannelRead 方法,主要目的是將累積區的內容 decode 到 數組中 callDecode(ctx, cumulation, out); // 如果累計區沒有可讀字節了 if (cumulation != null && !cumulation.isReadable()) { // 將次數歸零
numReads = 0; // 釋放累計區 cumulation.release(); // 等待 gc cumulation = null; } // 如果超過了 16 次,就壓縮累計區,主要是將已經讀過的數據丟棄,將 readIndex 歸零。 else if (++ numReads >= discardAfterReads) { numReads = 0; discardSomeReadBytes(); } int size = out.size(); // 如果沒有向數組插入過任何數據 decodeWasNull = !out.insertSinceRecycled(); // 循環數組,向後面的 handler 發送數據,如果數組是空,那不會調用 fireChannelRead(ctx, out, size); // 將數組中的內容清空,將數組的數組的下標恢復至原來 out.recycle(); }

樓主已經在方法中寫了註釋,但還是說說主要的步驟:

  1. 從對象池中取出一個空的數組。
  2. 判斷成員變量是否是第一次使用,(註意,既然使用了成員變量,所以這個 handler 不能是 handler 的。)將 unsafe 中傳遞來的數據寫入到這個 cumulation 累積區中。
  3. 寫到累積區後,調用子類的 decode 方法,嘗試將累積區的內容解碼,每成功解碼一個,就調用後面節點的 channelRead 方法。若沒有解碼成功,什麽都不做。
  4. 如果累積區沒有未讀數據了,就釋放累積區。
  5. 如果還有未讀數據,且解碼超過了 16 次(默認),就對累積區進行壓縮。將讀取過的數據清空,也就是將 readIndex 設置為0.
  6. 設置 decodeWasNull 的值,如果上一次沒有插入任何數據,這個值就是 ture。該值在 調用 channelReadComplete 方法的時候,會觸發 read 方法(不是自動讀取的話),嘗試從 JDK 的通道中讀取數據,並將之前的邏輯重來。主要應該是怕如果什麽數據都沒有插入,就執行 channelReadComplete 會遺漏數據。
  7. 調用 fireChannelRead 方法,將數組中的元素發送到後面的 handler 中。
  8. 將數組清空。並還給對象池。

下面來說說詳細的步驟。

2. 從對象池中取出一個空的數組

代碼:

@1
CodecOutputList out = CodecOutputList.newInstance();
@2
static CodecOutputList newInstance() {
    return CODEC_OUTPUT_LISTS_POOL.get().getOrCreate();
}
@3
private static final FastThreadLocal<CodecOutputLists> CODEC_OUTPUT_LISTS_POOL =
        new FastThreadLocal<CodecOutputLists>() {
            @Override
            protected CodecOutputLists initialValue() throws Exception {
                // 16 CodecOutputList per Thread are cached.
                return new CodecOutputLists(16);
            }
        };
@4
CodecOutputLists(int numElements) {
    elements = new CodecOutputList[MathUtil.safeFindNextPositivePowerOfTwo(numElements)];
    for (int i = 0; i < elements.length; ++i) {
        // Size of 16 should be good enough for the majority of all users as an initial capacity.
        elements[i] = new CodecOutputList(this, 16);
    }
    count = elements.length;
    currentIdx = elements.length;
    mask = elements.length - 1;
}
@5
private CodecOutputList(CodecOutputListRecycler recycler, int size) {
    this.recycler = recycler;
    array = new Object[size];
}

@6
public CodecOutputList getOrCreate() {
    if (count == 0) {
        // Return a new CodecOutputList which will not be cached. We use a size of 4 to keep the overhead
        // low.
        return new CodecOutputList(NOOP_RECYCLER, 4);
    }
    --count;

    int idx = (currentIdx - 1) & mask;
    CodecOutputList list = elements[idx];
    currentIdx = idx;
    return list;
}

代碼分為 1,2,3,4,5, 6 步驟。

  1. 靜態方法調用。
  2. 從 FastThreadLocal 中取出一個 CodecOutputLists 對象,並從這個集合中再取出一個 List。也就是 List 中有 List。可以理解為雙重數組。
  3. 調用 FastThreadLocal 的 initialValue 方法返回一個 CodecOutputLists 對象。
  4. 創建數組。數組大小默認16,循環填充 CodecOutputList 元素。設置 count,currentIdx ,mask 屬性。
  5. 創建 CodecOutputList 對象,這個 recycler 就是他的父 CodecOutputLists,並創建一個默認 16 的空數組。
  6. 首次進入 count 不是0,應該是 16,隨後將 count -1,並與運算出 Lists 中的下標,獲取到下標的內容。也就是一個 List。在調用 recycle 方法還給對象池的時候,會將所有參數恢復。

由於這個 getOrCreate 方法會被一個線程的多個地方使用,因此 16 是個統計值。當 16 不夠的時候,就會創建一個新的 List。也就是 count == 0 的邏輯。而 & mask 的操作就是一個取模的操作。

3. 寫入累積區

代碼如下:

cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);

這個 cumulator 默認是個 Cumulator 類型的 MERGE_CUMULATOR,該實例最主要的是從重寫了 cumulate 方法:

public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        final ByteBuf buffer;
        if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
            buffer = expandCumulation(alloc, cumulation, in.readableBytes());
        } else {
            buffer = cumulation;
        }
        buffer.writeBytes(in);
        in.release();
        return buffer;
    }
};

可以看到該方法,主要是將 unsafe.read 傳遞過來的 ByteBuf 的內容寫入到 cumulation 累積區中,然後釋放掉舊的內容,由於這個變量是成員變量,因此可以多次調用 channelRead 方法寫入。

同時這個方法也考慮到了擴容的問題,總的來說就是 copy。

當然,ByteToMessageDecoder 中還有一個 Cumulator 實例,稱之為 COMPOSITE_CUMULATOR,混合累積。由於上個實例的 cumulate 方法是使用內存拷貝的,因此,這裏提供了使用混合內存。相較於拷貝,性能會更好點,但同時也會更復雜。

4. decode 方法的作用

當數據追擊到累積區之後,需要調用 decode 方法進行解碼,代碼如下:

@ 1
callDecode(ctx, cumulation, out);

@2
 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    // 如果累計區還有可讀字節
    while (in.isReadable()) {
        int outSize = out.size();
        // 上次循環成功解碼
        if (outSize > 0) {
            // 調用後面的業務 handler 的  ChannelRead 方法
            fireChannelRead(ctx, out, outSize);
            // 將 size 置為0
            out.clear();//
            if (ctx.isRemoved()) {
                break;
            }
            outSize = 0;
        }
        // 得到可讀字節數
        int oldInputLength = in.readableBytes();
        // 調用 decode 方法,將成功解碼後的數據放入道 out 數組中,可能會刪除當前節點,刪除之前會將數據發送到最後的 handler
        decodeRemovalReentryProtection(ctx, in, out);// decode()
        if (ctx.isRemoved()) {
            break;
        }
        if (outSize == out.size()) {
            if (oldInputLength == in.readableBytes()) {
                break;
            } else {
                continue;
            }
        }
        if (isSingleDecode()) {
            break;
        }
    }
}

該方法主要邏輯:只要累積區還有未讀數據,就循環進行讀取。

  1. 調用 decodeRemovalReentryProtection 方法,內部調用了子類重寫的 decode 方法,很明顯,這裏是個模板模式。decode 方法的邏輯就是將累積區的內容按照約定進行解碼,如果成功解碼,就添加到數組中。同時該方法也會檢查該 handler 的狀態,如果被移除出 pipeline 了,就將累積區的內容直接刷新到後面的 handler 中。

  2. 如果 Context 節點被移除了,直接結束循環。如果解碼前的數組大小和解碼後的數組大小相等,且累積區的可讀字節數沒有變化,說明此次讀取什麽都沒做,就直接結束。如果字節數變化了,說明雖然數組沒有增加,但確實在讀取字節,就再繼續讀取。

  3. 如果上面的判斷過了,說明數組讀到數據了,但如果累積區的 readIndex 沒有變化,則拋出異常,說明沒有讀取數據,但數組卻增加了,子類的操作是不對的。

  4. 如果是個單次解碼器,解碼一次就直接結束了。

所以,這段代碼的關鍵就是子類需要重寫 decode 方法,將累積區的數據正確的解碼並添加到數組中。每添加一次成功,就會調用 fireChannelRead 方法,將數組中的數據傳遞給後面的 handler。完成之後將數組的 size 設置為 0.

所以,如果你的業務 handler 在這個地方可能會被多次調用。也可能一次也不調用。取決於數組中的值。當然,如果解碼 handler 被移除了,就會將累積區的所有數據刷到後面的 handler。

5. 剩下的邏輯

上面的邏輯就是解碼器最主要的邏輯:

將 read 方法的數據讀取到累積區,使用解碼器解碼累積區的數據,解碼成功一個就放入到一個數組中,並將數組中的數據一次次的傳遞到後面的handler。

從上面的邏輯看,除非 handler 被移除,否則不會調用後面的 handler 方法,也就是說,只要不滿足解碼器的解碼規則,就不會傳遞給後面的 handler。

再看看後面的邏輯,主要在 finally 塊中:

  1. 如果累積區沒有可讀數據了,將計數器歸零,並釋放累積區。
  2. 如果不滿足上面的條件,且計數器超過了 16 次,就壓縮累積區的內容,壓縮手段是刪除已讀的數據。將 readIndex 置為 0。還記得 ByteBuf 的指針結構嗎?

技術分享圖片

這樣就能節省一些內存了,但這會引起一些內存復制的過程,以性能損耗為前提的。

  1. 記錄 decodeWasNull 屬性,這個值的決定來自於你有沒有成功的向數組中插入數據,如果插入了,它就是 fasle,沒有插入,他就是 true。這個值的作用在於,當 channelRead 方法結束的時候,執行該 decoder 的 channelReadComplete 方法(如果你沒有重寫的話),會判斷這個值:

技術分享圖片

如果是 true,則會判斷 autoRead 屬性,如果是 false 的話,那麽 Netty 認為還有數據沒有讀到,不然數組為什麽一直是空的?就主動調用 read 方法從 Socket 讀取。

  1. 調用 fireChannelRead 方法,嘗試將數組中的數據發送到後面的 handler。為什麽要這麽做。按道理,到這一步的時候,數組不可能是空,為什麽這裏還要這麽謹慎的再發送一次?

答:如果是單次解碼器,就需要發送了,因此單詞解碼器是不會再 callDecode 方法中發送的。

  1. 最後,將數組還給對象池。並清空數組內容。

技術分享圖片

最後一行的 recycler.recycle(this),有兩種結果,如果是 CodecOutputLists 的 recycle 方法,內容如下:

技術分享圖片

恢復數組下標,對 count ++,表示有對象可用了。

還有第二種,當 16 個數組不夠用了,就需要創建一個新的,在 getOrCreate 方法體現。而構造函數中的 recycler 是一個空對象。我們看看這個對象:

技術分享圖片

當調用 recycle 方法的時候,什麽都不做。等待 GC 回收。因為這不是個對象池的引用。

好,到這裏,關於 ByteToMessageDecoder 解碼器的主要功能就解讀完了。

5. 總結

可以說,ByteToMessageDecoder 是解碼器的核心所做,Netty 在這裏使用了模板模式,留給子類擴展的方法就是 decode 方法。

主要邏輯就是將所有的數據全部放入累積區,子類從累積區取出數據進行解碼後放入到一個 數組中,ByteToMessageDecoder 會循環數組調用後面的 handler 方法,將數據一幀幀的發送到業務 handler 。完成這個的解碼邏輯。

使用這種方式,無論是粘包還是拆包,都可以完美的實現。

還有一些小細節:

  1. 比如解碼器可以單次的。
  2. 如果解碼一直不成功,那麽數據就一直無法到達後面的 handler。除非該解碼器從 pipeline 移除。
  3. 像其他的 Netty 模塊一樣,這裏也使用了對象池的概念,數組存放在線程安全的 ThreadLocal 中,默認 16 個,當不夠時,就創建新的,用完即被 GC 回收。
  4. 當數組從未成功添加數據,且程序沒有開啟 autoRead ,就主動調用 read 方法。嘗試讀取數據。

Netty 所有的解碼器,都可以在此類上擴展,一切取決於 decode 的實現。只要遵守 ByteToMessageDecoder 的約定即可。

good luck!!!!

Netty 解碼器抽象父類 ByteToMessageDecoder 源碼解析