Netty原始碼分析第6章(解碼器)---->第1節: ByteToMessageDecoder
Netty原始碼分析第六章: 解碼器
概述:
在我們上一個章節遺留過一個問題, 就是如果Server在讀取客戶端的資料的時候, 如果一次讀取不完整, 就觸發channelRead事件, 那麼Netty是如何處理這類問題的, 在這一章中, 會對此做詳細剖析
之前的章節我們學習過pipeline, 事件在pipeline中傳遞, handler可以將事件擷取並對其處理, 而之後剖析的編解碼器, 其實就是一個handler, 擷取byteBuf中的位元組, 然後組建成業務需要的資料進行繼續傳播
編碼器, 通常是OutBoundHandler, 也就是以自身為基準, 對那些對外流出的資料做處理, 所以也叫編碼器, 將資料經過編碼傳送出去
解碼器, 通常是inboundHandler, 也就是以自身為基準, 對那些流向自身的資料做處理, 所以也叫解碼器, 將對向的資料接收之後經過解碼再進行使用
同樣, 在netty的編碼器中, 也會對半包和粘包問題做相應的處理
什麼是半包, 顧名思義, 就是不完整的資料包, 因為netty在輪詢讀事件的時候, 每次將channel中讀取的資料, 不一定是一個完整的資料包, 這種情況, 就叫半包
粘包同樣也不難理解, 如果client往server傳送資料包, 如果傳送頻繁很有可能會將多個數據包的資料都發送到通道中, 如果在server在讀取的時候可能會讀取到超過一個完整資料包的長度, 這種情況叫粘包
有關半包和粘包, 入下圖所示:
6-0-1
netty對半包的或者粘包的處理其實也很簡單, 通過之前的學習, 我們知道, 每個handler是和channel唯一繫結的, 一個handler只對應一個channel, 所以將channel中的資料讀取時候經過解析, 如果不是一個完整的資料包, 則解析失敗, 將這塊資料包進行儲存, 等下次解析時再和這個資料包進行組裝解析, 直到解析到完整的資料包, 才會將資料包進行向下傳遞
具體流程是在程式碼中如何體現的呢?我們進入到原始碼分析中
第一節: ByteToMessageDecoder
ByteToMessageDecoder解碼器, 顧名思義, 是一個將Byte解析成訊息的解碼器,
我們看他的定義:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{
//類體省略
}
這裡繼承了ChannelInboundHandlerAdapter, 根據之前的學習, 我們知道, 這是個inbound型別的handler, 也就是處理流向自身事件的handler
其次, 該類通過abstract關鍵字修飾, 說明是個抽象類, 在我們實際使用的時候, 並不是直接使用這個類, 而是使用其子類, 類定義瞭解碼器的骨架方法, 具體實現邏輯交給子類, 同樣, 在半包處理中也是由該類進行實現的
netty中很多解碼器都實現了這個類, 並且, 我們也可以通過實現該類進行自定義解碼器
我們重點關注一下該類的一個屬性:
ByteBuf cumulation;
這個屬性, 就是有關半包處理的關鍵屬性, 從概述中我們知道, netty會將不完整的資料包進行儲存, 這個資料包就是儲存在這個屬性中
之前的學習我們知道, ByteBuf讀取完資料會傳遞channelRead事件, 傳播過程中會呼叫handler的channelRead方法, ByteToMessageDecoder的channelRead方法, 就是編碼的關鍵部分
我們看其channelRead方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//如果message是byteBuf型別
if (msg instanceof ByteBuf) {
//簡單當成一個arrayList, 用於盛放解析到的物件
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
//當前累加器為空, 說明這是第一次從io流裡面讀取資料
first = cumulation == null;
if (first) {
//如果是第一次, 則將累加器賦值為剛讀進來的物件
cumulation = data;
} else {
//如果不是第一次, 則把當前累加的資料和讀進來的資料進行累加
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
//呼叫子類的方法進行解析
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
//記錄list長度
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
//向下傳播
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
//不是byteBuf型別則向下傳播
ctx.fireChannelRead(msg);
}
}
這方法比較長, 帶大家一步步剖析
首先判斷如果傳來的資料是ByteBuf, 則進入if塊中
CodecOutputList out = CodecOutputList.newInstance() 這裡就當成一個ArrayList就好, 用於盛放解碼完成的資料
ByteBuf data = (ByteBuf) msg 這步將資料轉化成ByteBuf
first = cumulation == null 這裡表示如果cumulation == null, 說明沒有儲存板半包資料, 則將當前的資料儲存在屬性cumulation中
如果 cumulation != null , 說明儲存了半包資料, 則通過cumulator.cumulate(ctx.alloc(), cumulation, data)將讀取到的資料和原來的資料進行累加, 儲存在屬性cumulation中
我們看cumulator屬性:
private Cumulator cumulator = MERGE_CUMULATOR;
這裡呼叫了其靜態屬性MERGE_CUMULATOR, 我們跟過去:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
ByteBuf buffer;
//不能到過最大記憶體
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
//將當前資料buffer
buffer.writeBytes(in);
in.release();
return buffer;
}
};
這裡建立了Cumulator型別的靜態物件, 並重寫了cumulate方法, 這裡cumulate方法, 就是用於將ByteBuf進行拼接的方法:
方法中, 首先判斷cumulation的寫指標+in的可讀位元組數是否超過了cumulation的最大長度, 如果超過了, 將對cumulation進行擴容, 如果沒超過, 則將其賦值到區域性變數buffer中
然後將in的資料寫到buffer中, 將in進行釋放, 返回寫入資料後的ByteBuf
回到channelRead方法中:
最後通過callDecode(ctx, cumulation, out)方法進行解碼, 這裡傳入了Context物件, 緩衝區cumulation和集合out:
我們跟到callDecode(ctx, cumulation, out)方法中:
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
//只要累加器裡面有資料
while (in.isReadable()) {
int outSize = out.size();
//判斷當前List是否有物件
if (outSize > 0) {
//如果有物件, 則向下傳播事件
fireChannelRead(ctx, out, outSize);
//清空當前list
out.clear();
//解碼過程中如ctx被removed掉就break
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
//當前可讀資料長度
int oldInputLength = in.readableBytes();
//子類實現
//子類解析, 解析玩物件放到out裡面
decode(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
//List解析前大小 和解析後長度一樣(什麼沒有解析出來)
if (outSize == out.size()) {
//原來可讀的長度==解析後可讀長度
//說明沒有讀取資料(當前累加的資料並沒有拼成一個完整的資料包)
if (oldInputLength == in.readableBytes()) {
//跳出迴圈(下次在讀取資料才能進行後續的解析)
break;
} else {
//沒有解析到資料, 但是進行讀取了
continue;
}
}
//out裡面有資料, 但是沒有從累加器讀取資料
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
這裡首先迴圈判斷傳入的ByteBuf是否有可讀位元組, 如果還有可讀位元組說明沒有解碼完成, 則迴圈繼續解碼
然後判斷集合out的大小, 如果大小大於1, 說明out中盛放了解碼完成之後的資料, 然後將事件向下傳播, 並清空out
因為我們第一次解碼out是空的, 所以這裡不會進入if塊, 這部分我們稍後分析, 這裡繼續往下看
通過 int oldInputLength = in.readableBytes() 獲取當前ByteBuf, 其實也就是屬性cumulation的可讀位元組數, 這裡就是一個備份用於比較, 我們繼續往下看:
decode(ctx, in, out)方法是最終的解碼操作, 這部會讀取cumulation並且將解碼後的資料放入到集合out中, 在ByteToMessageDecoder中的該方法是一個抽象方法, 讓子類進行實現, 我們使用的netty很多的解碼都是繼承了ByteToMessageDecoder並實現了decode方法從而完成了解碼操作, 同樣我們也可以遵循相應的規則進行自定義解碼器, 在之後的小節中會講解netty定義的解碼器, 並剖析相關的實現細節, 這裡我們繼續往下看:
if (outSize == out.size()) 這個判斷表示解析之前的out大小和解析之後out大小進行比較, 如果相同, 說明並沒有解析出資料, 我們進入到if塊中:
if (oldInputLength == in.readableBytes()) 表示cumulation的可讀位元組數在解析之前和解析之後是相同的, 說明解碼方法中並沒有解析資料, 也就是當前的資料並不是一個完整的資料包, 則跳出迴圈, 留給下次解析, 否則, 說明沒有解析到資料, 但是讀取了, 所以跳過該次迴圈進入下次迴圈
最後判斷 if (oldInputLength == in.readableBytes()) , 這裡代表out中有資料, 但是並沒有從cumulation讀資料, 說明這個out的內容是非法的, 直接丟擲異常
我們回到channRead方法中:
我們關注finally中的內容:
finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
//記錄list長度
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
//向下傳播
fireChannelRead(ctx, out, size);
out.recycle();
}
首先判斷cumulation不為null, 並且沒有可讀位元組, 則將累加器進行釋放, 並設定為null
之後記錄out的長度, 通過fireChannelRead(ctx, out, size)將channelRead事件進行向下傳播, 並回收out物件
我們跟到fireChannelRead(ctx, out, size)方法中:
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
//遍歷List
for (int i = 0; i < numElements; i ++) {
//逐個向下傳遞
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
這裡遍歷out集合, 並將裡面的元素逐個向下傳遞
以上就是有關解碼的骨架邏輯