Netty 解碼器抽象父類 ByteToMessageDecoder 源碼解析
前言
Netty 的解碼器有很多種,比如基於長度的,基於分割符的,私有協議的。但是,總體的思路都是一致的。
拆包思路:當數據滿足了 解碼條件時,將其拆開。放到數組。然後發送到業務 handler 處理。
半包思路: 當讀取的數據不夠時,先存起來,直到滿足解碼條件後,放進數組。送到業務 handler 處理。
而實現這個邏輯的就是我們今天的主角:ByteToMessageDecoder。
看名字的意思是:將字節轉換成消息的解碼器。人如其名。而他本身也是一個入站 handler,所以,我們還是從他的 channelRead 方法入手。
1. channelRead 方法
精簡過的代碼如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 從對象池中取出一個List
CodecOutputList out = CodecOutputList.newInstance();
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
// 第一次解碼
cumulation = data;// 累計
} else {
// 第二次解碼,就將 data 向 cumulation 追加,並釋放 data
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 得到追加後的 cumulation 後,調用 decode 方法進行解碼
// 解碼過程中,調用 fireChannelRead 方法,主要目的是將累積區的內容 decode 到 數組中
callDecode(ctx, cumulation, out);
// 如果累計區沒有可讀字節了
if (cumulation != null && !cumulation.isReadable()) {
// 將次數歸零
numReads = 0;
// 釋放累計區
cumulation.release();
// 等待 gc
cumulation = null;
} // 如果超過了 16 次,就壓縮累計區,主要是將已經讀過的數據丟棄,將 readIndex 歸零。
else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
// 如果沒有向數組插入過任何數據
decodeWasNull = !out.insertSinceRecycled();
// 循環數組,向後面的 handler 發送數據,如果數組是空,那不會調用
fireChannelRead(ctx, out, size);
// 將數組中的內容清空,將數組的數組的下標恢復至原來
out.recycle();
}
樓主已經在方法中寫了註釋,但還是說說主要的步驟:
- 從對象池中取出一個空的數組。
- 判斷成員變量是否是第一次使用,(註意,既然使用了成員變量,所以這個 handler 不能是 handler 的。)將 unsafe 中傳遞來的數據寫入到這個 cumulation 累積區中。
- 寫到累積區後,調用子類的 decode 方法,嘗試將累積區的內容解碼,每成功解碼一個,就調用後面節點的 channelRead 方法。若沒有解碼成功,什麽都不做。
- 如果累積區沒有未讀數據了,就釋放累積區。
- 如果還有未讀數據,且解碼超過了 16 次(默認),就對累積區進行壓縮。將讀取過的數據清空,也就是將 readIndex 設置為0.
- 設置 decodeWasNull 的值,如果上一次沒有插入任何數據,這個值就是 ture。該值在 調用 channelReadComplete 方法的時候,會觸發 read 方法(不是自動讀取的話),嘗試從 JDK 的通道中讀取數據,並將之前的邏輯重來。主要應該是怕如果什麽數據都沒有插入,就執行 channelReadComplete 會遺漏數據。
- 調用 fireChannelRead 方法,將數組中的元素發送到後面的 handler 中。
- 將數組清空。並還給對象池。
下面來說說詳細的步驟。
2. 從對象池中取出一個空的數組
代碼:
@1
CodecOutputList out = CodecOutputList.newInstance();
@2
static CodecOutputList newInstance() {
return CODEC_OUTPUT_LISTS_POOL.get().getOrCreate();
}
@3
private static final FastThreadLocal<CodecOutputLists> CODEC_OUTPUT_LISTS_POOL =
new FastThreadLocal<CodecOutputLists>() {
@Override
protected CodecOutputLists initialValue() throws Exception {
// 16 CodecOutputList per Thread are cached.
return new CodecOutputLists(16);
}
};
@4
CodecOutputLists(int numElements) {
elements = new CodecOutputList[MathUtil.safeFindNextPositivePowerOfTwo(numElements)];
for (int i = 0; i < elements.length; ++i) {
// Size of 16 should be good enough for the majority of all users as an initial capacity.
elements[i] = new CodecOutputList(this, 16);
}
count = elements.length;
currentIdx = elements.length;
mask = elements.length - 1;
}
@5
private CodecOutputList(CodecOutputListRecycler recycler, int size) {
this.recycler = recycler;
array = new Object[size];
}
@6
public CodecOutputList getOrCreate() {
if (count == 0) {
// Return a new CodecOutputList which will not be cached. We use a size of 4 to keep the overhead
// low.
return new CodecOutputList(NOOP_RECYCLER, 4);
}
--count;
int idx = (currentIdx - 1) & mask;
CodecOutputList list = elements[idx];
currentIdx = idx;
return list;
}
代碼分為 1,2,3,4,5, 6 步驟。
- 靜態方法調用。
- 從 FastThreadLocal 中取出一個 CodecOutputLists 對象,並從這個集合中再取出一個 List。也就是 List 中有 List。可以理解為雙重數組。
- 調用 FastThreadLocal 的 initialValue 方法返回一個 CodecOutputLists 對象。
- 創建數組。數組大小默認16,循環填充 CodecOutputList 元素。設置 count,currentIdx ,mask 屬性。
- 創建 CodecOutputList 對象,這個 recycler 就是他的父 CodecOutputLists,並創建一個默認 16 的空數組。
- 首次進入 count 不是0,應該是 16,隨後將 count -1,並與運算出 Lists 中的下標,獲取到下標的內容。也就是一個 List。在調用 recycle 方法還給對象池的時候,會將所有參數恢復。
由於這個 getOrCreate 方法會被一個線程的多個地方使用,因此 16 是個統計值。當 16 不夠的時候,就會創建一個新的 List。也就是 count == 0 的邏輯。而 & mask 的操作就是一個取模的操作。
3. 寫入累積區
代碼如下:
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
這個 cumulator 默認是個 Cumulator 類型的 MERGE_CUMULATOR,該實例最主要的是從重寫了 cumulate 方法:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
可以看到該方法,主要是將 unsafe.read 傳遞過來的 ByteBuf 的內容寫入到 cumulation 累積區中,然後釋放掉舊的內容,由於這個變量是成員變量,因此可以多次調用 channelRead 方法寫入。
同時這個方法也考慮到了擴容的問題,總的來說就是 copy。
當然,ByteToMessageDecoder 中還有一個 Cumulator 實例,稱之為 COMPOSITE_CUMULATOR,混合累積。由於上個實例的 cumulate 方法是使用內存拷貝的,因此,這裏提供了使用混合內存。相較於拷貝,性能會更好點,但同時也會更復雜。
4. decode 方法的作用
當數據追擊到累積區之後,需要調用 decode 方法進行解碼,代碼如下:
@ 1
callDecode(ctx, cumulation, out);
@2
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 如果累計區還有可讀字節
while (in.isReadable()) {
int outSize = out.size();
// 上次循環成功解碼
if (outSize > 0) {
// 調用後面的業務 handler 的 ChannelRead 方法
fireChannelRead(ctx, out, outSize);
// 將 size 置為0
out.clear();//
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
// 得到可讀字節數
int oldInputLength = in.readableBytes();
// 調用 decode 方法,將成功解碼後的數據放入道 out 數組中,可能會刪除當前節點,刪除之前會將數據發送到最後的 handler
decodeRemovalReentryProtection(ctx, in, out);// decode()
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (isSingleDecode()) {
break;
}
}
}
該方法主要邏輯:只要累積區還有未讀數據,就循環進行讀取。
調用 decodeRemovalReentryProtection 方法,內部調用了子類重寫的 decode 方法,很明顯,這裏是個模板模式。decode 方法的邏輯就是將累積區的內容按照約定進行解碼,如果成功解碼,就添加到數組中。同時該方法也會檢查該 handler 的狀態,如果被移除出 pipeline 了,就將累積區的內容直接刷新到後面的 handler 中。
如果 Context 節點被移除了,直接結束循環。如果解碼前的數組大小和解碼後的數組大小相等,且累積區的可讀字節數沒有變化,說明此次讀取什麽都沒做,就直接結束。如果字節數變化了,說明雖然數組沒有增加,但確實在讀取字節,就再繼續讀取。
如果上面的判斷過了,說明數組讀到數據了,但如果累積區的 readIndex 沒有變化,則拋出異常,說明沒有讀取數據,但數組卻增加了,子類的操作是不對的。
如果是個單次解碼器,解碼一次就直接結束了。
所以,這段代碼的關鍵就是子類需要重寫 decode 方法,將累積區的數據正確的解碼並添加到數組中。每添加一次成功,就會調用 fireChannelRead 方法,將數組中的數據傳遞給後面的 handler。完成之後將數組的 size 設置為 0.
所以,如果你的業務 handler 在這個地方可能會被多次調用。也可能一次也不調用。取決於數組中的值。當然,如果解碼 handler 被移除了,就會將累積區的所有數據刷到後面的 handler。
5. 剩下的邏輯
上面的邏輯就是解碼器最主要的邏輯:
將 read 方法的數據讀取到累積區,使用解碼器解碼累積區的數據,解碼成功一個就放入到一個數組中,並將數組中的數據一次次的傳遞到後面的handler。
從上面的邏輯看,除非 handler 被移除,否則不會調用後面的 handler 方法,也就是說,只要不滿足解碼器的解碼規則,就不會傳遞給後面的 handler。
再看看後面的邏輯,主要在 finally 塊中:
- 如果累積區沒有可讀數據了,將計數器歸零,並釋放累積區。
- 如果不滿足上面的條件,且計數器超過了 16 次,就壓縮累積區的內容,壓縮手段是刪除已讀的數據。將 readIndex 置為 0。還記得 ByteBuf 的指針結構嗎?
這樣就能節省一些內存了,但這會引起一些內存復制的過程,以性能損耗為前提的。
- 記錄 decodeWasNull 屬性,這個值的決定來自於你有沒有成功的向數組中插入數據,如果插入了,它就是 fasle,沒有插入,他就是 true。這個值的作用在於,當 channelRead 方法結束的時候,執行該 decoder 的 channelReadComplete 方法(如果你沒有重寫的話),會判斷這個值:
如果是 true,則會判斷 autoRead 屬性,如果是 false 的話,那麽 Netty 認為還有數據沒有讀到,不然數組為什麽一直是空的?就主動調用 read 方法從 Socket 讀取。
- 調用 fireChannelRead 方法,嘗試將數組中的數據發送到後面的 handler。為什麽要這麽做。按道理,到這一步的時候,數組不可能是空,為什麽這裏還要這麽謹慎的再發送一次?
答:如果是單次解碼器,就需要發送了,因此單詞解碼器是不會再 callDecode 方法中發送的。
- 最後,將數組還給對象池。並清空數組內容。
最後一行的 recycler.recycle(this),有兩種結果,如果是 CodecOutputLists 的 recycle 方法,內容如下:
恢復數組下標,對 count ++,表示有對象可用了。
還有第二種,當 16 個數組不夠用了,就需要創建一個新的,在 getOrCreate 方法體現。而構造函數中的 recycler 是一個空對象。我們看看這個對象:
當調用 recycle 方法的時候,什麽都不做。等待 GC 回收。因為這不是個對象池的引用。
好,到這裏,關於 ByteToMessageDecoder 解碼器的主要功能就解讀完了。
5. 總結
可以說,ByteToMessageDecoder 是解碼器的核心所做,Netty 在這裏使用了模板模式,留給子類擴展的方法就是 decode 方法。
主要邏輯就是將所有的數據全部放入累積區,子類從累積區取出數據進行解碼後放入到一個 數組中,ByteToMessageDecoder 會循環數組調用後面的 handler 方法,將數據一幀幀的發送到業務 handler 。完成這個的解碼邏輯。
使用這種方式,無論是粘包還是拆包,都可以完美的實現。
還有一些小細節:
- 比如解碼器可以單次的。
- 如果解碼一直不成功,那麽數據就一直無法到達後面的 handler。除非該解碼器從 pipeline 移除。
- 像其他的 Netty 模塊一樣,這裏也使用了對象池的概念,數組存放在線程安全的 ThreadLocal 中,默認 16 個,當不夠時,就創建新的,用完即被 GC 回收。
- 當數組從未成功添加數據,且程序沒有開啟 autoRead ,就主動調用 read 方法。嘗試讀取數據。
Netty 所有的解碼器,都可以在此類上擴展,一切取決於 decode 的實現。只要遵守 ByteToMessageDecoder 的約定即可。
good luck!!!!
Netty 解碼器抽象父類 ByteToMessageDecoder 源碼解析