netty原始碼解解析(4.0)-18 ChannelHandler: codec--編解碼框架
編解碼框架和一些常用的實現位於io.netty.handler.codec包中。
編解碼框架包含兩部分:Byte流和特定型別資料之間的編解碼,也叫序列化和反序列化。不型別資料之間的轉換。
下圖是編解碼框架的類繼承體系:
其中MessageToByteEncoder和ByteToMessageDecoder是實現了序列化和反序列化框架。 MessageToMessage是不同型別資料之間轉換的框架。
序列化抽象實現: MessageToByteEncoder<I>
序列化是把 I 型別的資料轉換成Byte流。這個抽象類通過實現ChannelOutboundHandler的write方法在寫資料時把 I 型別的資料轉換成Byte流,下面是write方法的實現:
1 @Override 2 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 3 ByteBuf buf = null; 4 try { 5 if (acceptOutboundMessage(msg)) { 6 @SuppressWarnings("unchecked") 7 I cast = (I) msg; 8 buf = allocateBuffer(ctx, cast, preferDirect); 9 try { 10 encode(ctx, cast, buf); 11 } finally { 12 ReferenceCountUtil.release(cast); 13 } 14 15 if (buf.isReadable()) { 16 ctx.write(buf, promise); 17 } else { 18 buf.release(); 19 ctx.write(Unpooled.EMPTY_BUFFER, promise); 20 } 21 buf = null; 22 } else { 23 ctx.write(msg, promise); 24 } 25 } catch (EncoderException e) { 26 throw e; 27 } catch (Throwable e) { 28 throw new EncoderException(e); 29 } finally { 30 if (buf != null) { 31 buf.release(); 32 } 33 } 34 }
5行, 檢查msg的型別,如果是 I 型別返回true, 否則返回false。
7-10行, 分配一塊buffer, 並呼叫encode方法把msg編碼成Byte流放進這個buffer中。
15-19行,對含有Byte流程資料的buffer繼續執行寫操作。(不清楚寫操作流程的可以參考<<netty原始碼解解析(4.0)-15 Channel NIO實現:寫資料>>)
23行,如果msg不是 I 型別,跳過這個Handler, 繼續執行寫操作。
這裡呼叫的encode方法是一個抽象方法,留給子類實現定製的序列化操作。
反序列化抽象實現: ByteToMessageDecoder
這個抽象型別解決的主要問題是從Byte流中提取資料包。資料包是指剛好可以反序列化成一個特定型別Message的Byte陣列。但是在資料包長度不確定的情況下,沒辦法每次剛好從Byte流中剛好分離一個數據包。每次從Byte流中讀取資料有多種可能:
- 剛好是一個或多個完整的資料包。
- 不足一個完整的資料包,或錯誤的資料。
- 包含一個或多個完整的資料包,但有多餘的資料不足一個完整的資料包或錯誤的資料。
這個問題本質上和"TCP粘包"問題相同。解決這個問題有兩個關鍵點:
- 能夠確定資料包在Byte流中的開始位置和長度。
- 需要暫時快取不完整的資料包,等待後續資料拼接完整。
關於第(1)點,在這個抽象類中沒有處理,只是定義了一個抽象方法decode,留給子類處理。關於第(2)點,這個類定義了一個Cumulator(堆積器)來處理,把不完整的資料包暫時堆積到Cumulator中。Cumulator有兩個實現: MERGE_CUMULATOR(合併堆積器),COMPOSITE_CUMULATOR(組合堆積器)。預設使用的是MERGE_CUMULATOR。下面詳細分析一下這兩種Cumulator的實現。
MERGE_CUMULATOR的實現
這是一個合併堆積器,使用ByteBuf作為堆積緩衝區,把通過把資料寫到堆積緩衝實現新舊資料合併堆積。
1 @Override 2 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { 3 final ByteBuf buffer; 4 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() 5 || cumulation.refCnt() > 1 || cumulation instanceof ReadOnlyByteBuf) { 6 // Expand cumulation (by replace it) when either there is not more room in the buffer 7 // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or 8 // duplicate().retain() or if its read-only. 9 // 10 // See: 11 // - https://github.com/netty/netty/issues/2327 12 // - https://github.com/netty/netty/issues/1764 13 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); 14 } else { 15 buffer = cumulation; 16 } 17 buffer.writeBytes(in); 18 in.release(); 19 return buffer; 20 }
4-13行,如果當前的堆積緩衝區不能用了,分配一塊新的,把舊緩衝區中的資料轉移到新緩衝區中,並用新的替換舊的。當前堆積緩衝區不能用的條件是:
cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes(): 容量不夠
或者 cumulation.refCnt() > 1 : 在其他地方本引用
或者 cumulation instanceof ReadOnlyByteBuf 是隻讀的
17行,把資料追加到堆積緩衝區中。
COMPOSITE_CUMULATOR的實現
這是一個合併堆積器,和MERGE_CUMULATOR不同的是他使用的是CompositeByteBuf作為堆積緩衝區。
1 @Override 2 public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { 3 ByteBuf buffer; 4 if (cumulation.refCnt() > 1) { 5 // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user 6 // use slice().retain() or duplicate().retain(). 7 // 8 // See: 9 // - https://github.com/netty/netty/issues/2327 10 // - https://github.com/netty/netty/issues/1764 11 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); 12 buffer.writeBytes(in); 13 in.release(); 14 } else { 15 CompositeByteBuf composite; 16 if (cumulation instanceof CompositeByteBuf) { 17 composite = (CompositeByteBuf) cumulation; 18 } else { 19 composite = alloc.compositeBuffer(Integer.MAX_VALUE); 20 composite.addComponent(true, cumulation); 21 } 22 composite.addComponent(true, in); 23 buffer = composite; 24 } 25 return buffer; 26 }
4-13行,和MERGE_CUMULATOR一樣。
15-23行,如果當前的堆積緩衝區不是CompositeByteBuf型別,使用一個新的CompositeByteBuf型別的堆積緩衝區代替,並把資料轉移的新緩衝區中。
分離資料包的主流程
ByteToMessageDecoder是ChannelInboundHandlerAdapter的派生類,它通過覆蓋channelRead實現了反序列化的主流程。這個主流程主要是對堆積緩衝區cumulation的管理,主要步驟是:
- 把Byte流資料追加到cumulation中。
- 呼叫decode方法從cumulation中分離出完整的資料包,並把資料包反序列化成特定型別的資料,直到不能分離資料包為止。
- 檢查cumulation,如果沒有剩餘資料,就銷燬掉這個cumulation。否則,增加讀計數。如果讀計數超過丟棄閾值,丟掉部分資料,這一步是為了防止cumulation中堆積的資料過多。
- 把反序列化得到的Message List傳遞到pipeline中的下一個ChannelInboundHandler處理。
由於使用了cumulation,ByteToMessageDecoder就變成了一個有狀態的ChannelHandler, 它必須是獨佔的,不能使用ChannelHandler.@Sharable註解。
在channelRead中,並沒有直接呼叫decode方法,而是通過callDecode間接呼叫。而callDecdoe也不是直接呼叫,而是呼叫了decodeRemovalReentryProtection方法,這個方法只是對decode呼叫的簡單封裝。引數in是堆積緩衝區cumulation。 這個方法主要實現上面描述的第2個步驟。
1 //在channelRead中呼叫方式:callDecode(ctx, cumulation, out); 2 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { 3 try { 4 while (in.isReadable()) { 5 int outSize = out.size(); 6 7 if (outSize > 0) { 8 fireChannelRead(ctx, out, outSize); 9 out.clear(); 10 11 // Check if this handler was removed before continuing with decoding. 12 // If it was removed, it is not safe to continue to operate on the buffer. 13 // 14 // See: 15 // - https://github.com/netty/netty/issues/4635 16 if (ctx.isRemoved()) { 17 break; 18 } 19 outSize = 0; 20 } 21 22 int oldInputLength = in.readableBytes(); 23 decodeRemovalReentryProtection(ctx, in, out); 24 25 // Check if this handler was removed before continuing the loop. 26 // If it was removed, it is not safe to continue to operate on the buffer. 27 // 28 // See https://github.com/netty/netty/issues/1664 29 if (ctx.isRemoved()) { 30 break; 31 } 32 33 if (outSize == out.size()) { 34 if (oldInputLength == in.readableBytes()) { 35 break; 36 } else { 37 continue; 38 } 39 } 40 41 if (oldInputLength == in.readableBytes()) { 42 throw new DecoderException( 43 StringUtil.simpleClassName(getClass()) + 44 ".decode() did not read anything but decoded a message."); 45 } 46 47 if (isSingleDecode()) { 48 break; 49 } 50 } 51 } catch (DecoderException e) { 52 throw e; 53 } catch (Exception cause) { 54 throw new DecoderException(cause); 55 } 56 }
5-19行,如果已經成功分離出了至少一個數據包併成功反序列化,就呼叫fireChannelRead把得到的Message傳遞給pipeline中的下一個Handler處理。fireChannelRead會對out中的每一個Message呼叫一次ctx.fireChannelRead。
22,23行,先記下in中的資料長度,再執行反序列化操作。
33,39行,如果outSize == out.size()(沒有反序列化到新的Message), 且oldInputLength == in.readableBytes()(in中的資料長度沒有變化)表示in中的資料不足以完成一次反序列化操作,跳出迴圈。否則,繼續。
41行,出現了異常,完成了一次反序列化操作,但in中的資料沒變化,憑空多了(或少了)一些反序列化的後Message。
同時可以進行序列化和反序列化的抽象類: ByteToMessageCodec<I>
這個類是ChannelDuplexHandler的派生類,可以同時序列化和反序列化操作。和前面兩個類相比,它沒什麼特別是實現,內部使用MessageToByteEncoder<I>
序列化,使用ByteToMessageDecoder反序列化。
型別轉換編碼的抽象實現: MessageToMessageEncoder<I>
這個類是ChannelOutboundHandlerAdapter的派生類,它在功能是在write過程中,把 I 型別的資料轉換成另一種型別的資料。它定義了抽象方法encode,有子類負責實現具體的轉換操作。
1 @Override 2 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 3 CodecOutputList out = null; 4 try { 5 if (acceptOutboundMessage(msg)) { 6 out = CodecOutputList.newInstance(); 7 @SuppressWarnings("unchecked") 8 I cast = (I) msg; 9 try { 10 encode(ctx, cast, out); 11 } finally { 12 ReferenceCountUtil.release(cast); 13 } 14 15 if (out.isEmpty()) { 16 out.recycle(); 17 out = null; 18 19 throw new EncoderException( 20 StringUtil.simpleClassName(this) + " must produce at least one message."); 21 } 22 } else { 23 ctx.write(msg, promise); 24 } 25 } catch (EncoderException e) { 26 throw e; 27 } catch (Throwable t) { 28 throw new EncoderException(t); 29 } finally { 30 if (out != null) { 31 final int sizeMinusOne = out.size() - 1; 32 if (sizeMinusOne == 0) { 33 ctx.write(out.get(0), promise); 34 } else if (sizeMinusOne > 0) { 35 // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure 36 // See https://github.com/netty/netty/issues/2525 37 ChannelPromise voidPromise = ctx.voidPromise(); 38 boolean isVoidPromise = promise == voidPromise; 39 for (int i = 0; i < sizeMinusOne; i ++) { 40 ChannelPromise p; 41 if (isVoidPromise) { 42 p = voidPromise; 43 } else { 44 p = ctx.newPromise(); 45 } 46 ctx.write(out.getUnsafe(i), p); 47 } 48 ctx.write(out.getUnsafe(sizeMinusOne), promise); 49 } 50 out.recycle(); 51 } 52 } 53 }
6-12行,如果msg是 I 型別的資料,呼叫encode把它轉換成另一種型別。
16-20行,如果沒有轉換成功,丟擲異常。
23行, 如果msg不是 I 型別,跳過當前的Handler。
31-50, 如果轉換成功,把轉換後的資料傳到到下一個Handler處理。33行處理只有一個轉換結果的情況。37-48行處理有多個轉換結果的情況。
型別轉換解碼的抽象實現: MessageToMessageDecoder<I>
這個類是ChannelInboundHandlerAdapter的派生類,它的功能是在read的過程中,把 I 型別的資料轉換成另一種型別的資料。它定義了抽象方法decode,有子類負責實現具體的轉換操作。它的channelRead和上面的類實現相似,但更簡單,這裡就不再分析原始碼了。
型別轉換編解碼的抽象實現: MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
這個類是ChannelDuplexHandler的派生類,它的功能是在write過程中把OUTBOUND_IN型別的資料轉換成INBOUND_IN型別的資料,在read過程中程序相反的操作。它沒有特別的實現,內部使用前面的兩個類實現編解碼。