Netty 編解碼奧祕
Netty中編解碼
Netty 的解碼器有很多種,比如基於長度的,基於分割符的,私有協議的。但是,總體的思路都是一致的。
拆包思路:當資料滿足了 解碼條件時,將其拆開。放到陣列。然後傳送到業務 handler 處理。
半包思路: 當讀取的資料不夠時,先存起來,直到滿足解碼條件後,放進陣列。送到業務 handler 處理。
拆包的原理
在沒有netty的情況下,使用者如果自己需要拆包,基本原理就是不斷從TCP緩衝區中讀取資料,每次讀取完都需要判斷是否是一個完整的資料包
- 如果當前讀取的資料不足以拼接成一個完整的業務資料包,那就保留該資料,繼續從tcp緩衝區中讀取,直到得到一個完整的資料包
- 如果當前讀到的資料加上已經讀取的資料足夠拼接成一個數據包,那就將已經讀取的資料拼接上本次讀取的資料,夠成一個完整的業務資料包傳遞到業務邏輯,多餘的資料仍然保留,以便和下次讀到的資料嘗試拼接
Netty中拆包的基類
Netty提供的解碼器都繼承於ByteToMessageDecoder,下面是ByteToMessageDecoder的uml圖
netty 中的拆包也是如上這個原理,在每個SocketChannel中會一個 pipeline ,pipeline 內部會加入解碼器,解碼器都繼承基類 ByteToMessageDecoder,其內部會有一個累加器,每次從當前SocketChannel讀取到資料都會不斷累加,然後嘗試對累加到的資料進行拆包,拆成一個完整的業務資料包,下面我們先詳細分析下這個類
看名字的意思是:將位元組轉換成訊息的解碼器。人如其名。而他本身也是一個入站 handler,所以,我們還是從他的 channelRead 方法入手。
channelRead 方法
我們先看看基類中的屬性,cumulation是此基類中的一個 ByteBuf 型別的累積區,每次從當前SocketChannel讀取到資料都會不斷累加,然後嘗試對累加到的資料進行拆包,拆成一個完整的業務資料包,如果不夠一個完整的資料包,則等待下一次從TCP的資料到來,繼續累加到此cumulation中
publicabstractclassByteToMessageDecoderextendsChannelInboundHandlerAdapter{
//累積區
ByteBufcumulation;
privateByteToMessageDecoder.Cumulatorcumulator;
privatebooleansingleDecode;
privatebooleandecodeWasNull;
privatebooleanfirst;
privateintdiscardAfterReads;
privateintnumReads;
.
.
.
}
channelRead方法是每次從TCP緩衝區讀到資料都會呼叫的方法,觸發點在AbstractNioByteChannel的read方法中,裡面有個while迴圈不斷讀取,讀取到一次就觸發一次channelRead
@Override
publicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{
if(msginstanceofByteBuf){
//從物件池中取出一個List
CodecOutputListout=CodecOutputList.newInstance();
try{
ByteBufdata=(ByteBuf)msg;
first=cumulation==null;
if(first){
//第一次解碼
cumulation=data;//直接賦值
}else{
//第二次解碼,就將data向cumulation追加,並釋放data
cumulation=cumulator.cumulate(ctx.alloc(),cumulation,data);
}
//得到追加後的cumulation後,呼叫decode方法進行解碼
//主要目的是將累積區cumulation的內容decode到out陣列中
callDecode(ctx,cumulation,out);
}catch(DecoderExceptione){
throwe;
}catch(Throwablet){
thrownewDecoderException(t);
}finally{
//如果累計區沒有可讀位元組了,有可能在上面callDecode方法中已經將cumulation全部讀完了,此時writerIndex==readerIndex
//每讀一個位元組,readerIndex會+1
if(cumulation!=null&&!cumulation.isReadable()){
//將次數歸零
numReads=0;
//釋放累計區,因為累計區裡面的位元組都全部讀完了
cumulation.release();
//便於gc
cumulation=null;
//如果超過了16次,還有位元組沒有讀完,就將已經讀過的資料丟棄,將readIndex歸零。
}elseif(++numReads>=discardAfterReads){
numReads=0;
//將已經讀過的資料丟棄,將readIndex歸零。
discardSomeReadBytes();
}
intsize=out.size();
decodeWasNull=!out.insertSinceRecycled();
//迴圈陣列,向後面的handler傳送資料
fireChannelRead(ctx,out,size);
out.recycle();
}
}else{
ctx.fireChannelRead(msg);
}
}
- 從物件池中取出一個空的陣列。
- 判斷成員變數是否是第一次使用,將 unsafe 中傳遞來的資料寫入到這個 cumulation 累積區中。
- 寫到累積區後,在callDecode方法中呼叫子類的 decode 方法,嘗試將累積區的內容解碼,每成功解碼一個,就呼叫後面節點的 channelRead 方法。若沒有解碼成功,什麼都不做。
- 如果累積區沒有未讀資料了,就釋放累積區。
- 如果還有未讀資料,且解碼超過了 16 次(預設),就對累積區進行壓縮。將讀取過的資料清空,也就是將 readIndex 設定為0.
- 呼叫 fireChannelRead 方法,將陣列中的元素髮送到後面的 handler 中。
- 將陣列清空。並還給物件池。
下面來說說詳細的步驟。
寫入累積區
如果當前累加器沒有資料,就直接跳過記憶體拷貝,直接將位元組容器的指標指向新讀取的資料,否則,呼叫累加器累加資料至位元組容器
ByteBufdata=(ByteBuf)msg;
first=cumulation==null;
if(first){
cumulation=data;
}else{
cumulation=cumulator.cumulate(ctx.alloc(),cumulation,data);
}
我們看看構造方法
protectedByteToMessageDecoder(){
this.cumulator=MERGE_CUMULATOR;
this.discardAfterReads=16;
CodecUtil.ensureNotSharable(this);
}
可以看到 this.cumulator = MERGE_CUMULATOR;,那我們接下來看看 MERGE_CUMULATOR
publicstaticfinalByteToMessageDecoder.CumulatorMERGE_CUMULATOR=newByteToMessageDecoder.Cumulator(){
publicByteBufcumulate(ByteBufAllocatoralloc,ByteBufcumulation,ByteBufin){
ByteBufbuffer;
if(cumulation.writerIndex()<=cumulation.maxCapacity()-in.readableBytes()&&cumulation.refCnt()<=1){
buffer=cumulation;
}else{
buffer=ByteToMessageDecoder.expandCumulation(alloc,cumulation,in.readableBytes());
}
buffer.writeBytes(in);
in.release();
returnbuffer;
}
};
MERGE_CUMULATOR
是基類ByteToMessageDecoder中的一個靜態常量,其重寫了cumulate方法,下面我們看一下 MERGE_CUMULATO
R 是如何將新讀取到的資料累加到位元組容器裡的
netty 中ByteBuf的抽象,使得累加非常簡單,通過一個簡單的api呼叫 buffer.writeBytes(in); 便將新資料累加到位元組容器中,為了防止位元組容器大小不夠,在累加之前還進行了擴容處理
staticByteBufexpandCumulation(ByteBufAllocatoralloc,ByteBufcumulation,intreadable){
ByteBufoldCumulation=cumulation;
cumulation=alloc.buffer(oldCumulation.readableBytes()+readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
returncumulation;
}
擴容也是一個記憶體拷貝操作,新增的大小即是新讀取資料的大小
將累加到的資料傳遞給業務進行拆包
當資料追加到累積區之後,需要呼叫 decode 方法進行解碼,程式碼如下:
publicbooleanisReadable(){
//寫的座標大於讀的座標則說明還有資料可讀
returnthis.writerIndex>this.readerIndex;
}
protectedvoidcallDecode(ChannelHandlerContextctx,ByteBufin,List<Object>out){
//如果累計區還有可讀位元組,迴圈解碼,因為這裡in有可能是粘包,即多次完整的資料包粘在一起,通過換行符連線
//下面的decode方法只能處理一個完整的資料包,所以這裡迴圈處理粘包
while(in.isReadable()){
intoutSize=out.size();
//上次迴圈成功解碼
if(outSize>0){
//處理一個粘包就呼叫一次後面的業務handler的ChannelRead方法
fireChannelRead(ctx,out,outSize);
//將size置為0
out.clear();//
if(ctx.isRemoved()){
break;
}
outSize=0;
}
//得到可讀位元組數
intoldInputLength=in.readableBytes();
//呼叫decode方法,將成功解碼後的資料放入道out陣列中
decode(ctx,in,out);
if(ctx.isRemoved()){
break;
}
if(outSize==out.size()){
if(oldInputLength==in.readableBytes()){
break;
}else{
continue;
}
}
if(isSingleDecode()){
break;
}
}
}
我們看看 fireChannelRead
staticvoidfireChannelRead(ChannelHandlerContextctx,List<Object>msgs,intnumElements){
if(msgsinstanceofCodecOutputList){
fireChannelRead(ctx,(CodecOutputList)msgs,numElements);
}else{
//將所有已解碼的資料向下業務hadder傳遞
for(inti=0;i<numElements;++i){
ctx.fireChannelRead(msgs.get(i));
}
}
}
該方法主要邏輯:只要累積區還有未讀資料,就迴圈進行讀取。
- 呼叫 decodeRemovalReentryProtection 方法,內部呼叫了子類重寫的 decode 方法,很明顯,這裡是個模板模式。decode 方法的邏輯就是將累積區的內容按照約定進行解碼,如果成功解碼,就新增到陣列中。同時該方法也會檢查該 handler 的狀態,如果被移除出 pipeline 了,就將累積區的內容直接重新整理到後面的 handler 中。
- 如果 Context 節點被移除了,直接結束迴圈。如果解碼前的陣列大小和解碼後的陣列大小相等,且累積區的可讀位元組數沒有變化,說明此次讀取什麼都沒做,就直接結束。如果位元組數變化了,說明雖然陣列沒有增加,但確實在讀取位元組,就再繼續讀取。
- 如果上面的判斷過了,說明陣列讀到資料了,但如果累積區的 readIndex 沒有變化,則丟擲異常,說明沒有讀取資料,但陣列卻增加了,子類的操作是不對的。
- 如果是個單次解碼器,解碼一次就直接結束了,如果資料包一次就解碼完了,則下一次迴圈時 in.isReadable()就為false,因為 writerIndex = this.readerIndex 了
所以,這段程式碼的關鍵就是子類需要重寫 decode 方法,將累積區的資料正確的解碼並新增到陣列中。每新增一次成功,就會呼叫 fireChannelRead 方法,將陣列中的資料傳遞給後面的 handler。完成之後將陣列的 size 設定為 0.
所以,如果你的業務 handler 在這個地方可能會被多次呼叫。也可能一次也不呼叫。取決於陣列中的值。
「解碼器最主要的邏輯」:
❝將 read 方法的資料讀取到累積區,使用解碼器解碼累積區的資料,解碼成功一個就放入到一個數組中,並將陣列中的資料一次次的傳遞到後面的handler。
❞
清理位元組容器
業務拆包完成之後,只是從累積區中取走了資料,但是這部分空間對於累積區來說依然保留著,而位元組容器每次累加位元組資料的時候都是將位元組資料追加到尾部,如果不對累積區做清理,那麼時間一長就會OOM,清理部分的程式碼如下
finally{
//如果累計區沒有可讀位元組了,有可能在上面callDecode方法中已經將cumulation全部讀完了,此時writerIndex==readerIndex
//每讀一個位元組,readerIndex會+1
if(cumulation!=null&&!cumulation.isReadable()){
//將次數歸零
numReads=0;
//釋放累計區,因為累計區裡面的位元組都全部讀完了
cumulation.release();
//便於gc
cumulation=null;
//如果超過了16次,還有位元組沒有讀完,就將已經讀過的資料丟棄,將readIndex歸零。
}elseif(++numReads>=discardAfterReads){
numReads=0;
//將已經讀過的資料丟棄,將readIndex歸零。
discardSomeReadBytes();
}
intsize=out.size();
decodeWasNull=!out.insertSinceRecycled();
//迴圈陣列,向後面的handler傳送資料
fireChannelRead(ctx,out,size);
out.recycle();
}
- 如果累積區沒有可讀資料了,將計數器歸零,並釋放累積區。
- 如果不滿足上面的條件,且計數器超過了 16 次,就壓縮累積區的內容,壓縮手段是刪除已讀的資料。將 readIndex 置為 0。還記得 ByteBuf 的指標結構嗎?
ByteBuf
publicByteBufdiscardSomeReadBytes(){
this.ensureAccessible();
if(this.readerIndex==0){
returnthis;
}elseif(this.readerIndex==this.writerIndex){
this.adjustMarkers(this.readerIndex);
this.writerIndex=this.readerIndex=0;
returnthis;
}else{
//讀指標超過了Buffer容量的一半時做清理工作
if(this.readerIndex>=this.capacity()>>>1){
//拷貝,從readerIndex開始,拷貝this.writerIndex-this.readerIndex長度
this.setBytes(0,this,this.readerIndex,this.writerIndex-this.readerIndex);
//writerIndex=writerIndex-readerIndex
this.writerIndex-=this.readerIndex;
this.adjustMarkers(this.readerIndex);
//將讀指標重置為0
this.readerIndex=0;
}
returnthis;
}
}
我們看到discardSomeReadBytes 主要是將未讀的資料拷貝到原Buffer,重置 readerIndex 和 writerIndex
我們看到最後還呼叫 fireChannelRead 方法,嘗試將陣列中的資料傳送到後面的 handler。為什麼要這麼做。按道理,到這一步的時候,陣列不可能是空,為什麼這裡還要這麼謹慎的再發送一次?
如果是單次解碼器,就需要傳送了,因為單詞解碼器是不會在 callDecode 方法中傳送的。
總結
可以說,ByteToMessageDecoder 是解碼器的核心所做,Netty 在這裡使用了模板模式,留給子類擴充套件的方法就是 decode 方法。
主要邏輯就是將所有的資料全部放入累積區,子類從累積區取出資料進行解碼後放入到一個 陣列中,ByteToMessageDecoder 會迴圈陣列呼叫後面的 handler 方法,將資料一幀幀的傳送到業務 handler 。完成這個的解碼邏輯。
使用這種方式,無論是粘包還是拆包,都可以完美的實現。
Netty 所有的解碼器,都可以在此類上擴充套件,一切取決於 decode 的實現。只要遵守 ByteToMessageDecoder 的約定即可。下篇文章給大家列舉常用的解碼器。
結束
❝識別下方二維碼!回覆: 「
❞入群
」 ,掃碼加入我們交流群!
點贊是認可,在看是支援
歡迎關注我的公眾號!裡面可以加入微信技術交流群!