1. 程式人生 > >netty原始碼解解析(4.0)-18 ChannelHandler: codec--編解碼框架

  其中MessageToByteEncoder和ByteToMessageDecoder是實現了序列化和反序列化框架。 MessageToMessage是不同型別資料之間轉換的框架。  


序列化抽象實現: MessageToByteEncoder<I>

  序列化是把 I 型別的資料轉換成Byte流。這個抽象類通過實現ChannelOutboundHandler的write方法在寫資料時把 I 型別的資料轉換成Byte流,下面是write方法的實現:

 1     @Override
 2     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 3         ByteBuf buf = null;
 4         try {
 5             if (acceptOutboundMessage(msg)) {
 6                 @SuppressWarnings("unchecked")
 7                 I cast = (I) msg;
 8                 buf = allocateBuffer(ctx, cast, preferDirect);
 9                 try {
10                     encode(ctx, cast, buf);
11                 } finally {
12                     ReferenceCountUtil.release(cast);
13                 }
15                 if (buf.isReadable()) {
16                     ctx.write(buf, promise);
17                 } else {
18                     buf.release();
19                     ctx.write(Unpooled.EMPTY_BUFFER, promise);
20                 }
21                 buf = null;
22             } else {
23                 ctx.write(msg, promise);
24             }
25         } catch (EncoderException e) {
26             throw e;
27         } catch (Throwable e) {
28             throw new EncoderException(e);
29         } finally {
30             if (buf != null) {
31                 buf.release();
32             }
33         }
34     }

  5行,  檢查msg的型別,如果是 I 型別返回true, 否則返回false。

  7-10行, 分配一塊buffer, 並呼叫encode方法把msg編碼成Byte流放進這個buffer中。

  15-19行,對含有Byte流程資料的buffer繼續執行寫操作。(不清楚寫操作流程的可以參考<<netty原始碼解解析(4.0)-15 Channel NIO實現:寫資料>>)

    23行,如果msg不是 I 型別,跳過這個Handler, 繼續執行寫操作。



反序列化抽象實現: ByteToMessageDecoder


  1.  剛好是一個或多個完整的資料包。
  2.  不足一個完整的資料包,或錯誤的資料。
  3.  包含一個或多個完整的資料包,但有多餘的資料不足一個完整的資料包或錯誤的資料。  


  1.  能夠確定資料包在Byte流中的開始位置和長度。
  2.  需要暫時快取不完整的資料包,等待後續資料拼接完整。

  關於第(1)點,在這個抽象類中沒有處理,只是定義了一個抽象方法decode,留給子類處理。關於第(2)點,這個類定義了一個Cumulator(堆積器)來處理,把不完整的資料包暫時堆積到Cumulator中。Cumulator有兩個實現: MERGE_CUMULATOR(合併堆積器),COMPOSITE_CUMULATOR(組合堆積器)。預設使用的是MERGE_CUMULATOR。下面詳細分析一下這兩種Cumulator的實現。



 1 @Override
 2         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
 3             final ByteBuf buffer;
 4             if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
 5                     || cumulation.refCnt() > 1 || cumulation instanceof ReadOnlyByteBuf) {
 6                 // Expand cumulation (by replace it) when either there is not more room in the buffer
 7                 // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
 8                 // duplicate().retain() or if its read-only.
 9                 //
10                 // See:
11                 // - https://github.com/netty/netty/issues/2327
12                 // - https://github.com/netty/netty/issues/1764
13                 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
14             } else {
15                 buffer = cumulation;
16             }
17             buffer.writeBytes(in);
18             in.release();
19             return buffer;
20         }


    cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes(): 容量不夠

    或者 cumulation.refCnt() > 1 : 在其他地方本引用

            或者 cumulation instanceof ReadOnlyByteBuf 是隻讀的





 1        @Override
 2         public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
 3             ByteBuf buffer;
 4             if (cumulation.refCnt() > 1) {
 5                 // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user
 6                 // use slice().retain() or duplicate().retain().
 7                 //
 8                 // See:
 9                 // - https://github.com/netty/netty/issues/2327
10                 // - https://github.com/netty/netty/issues/1764
11                 buffer = expandCumulation(alloc, cumulation, in.readableBytes());
12                 buffer.writeBytes(in);
13                 in.release();
14             } else {
15                 CompositeByteBuf composite;
16                 if (cumulation instanceof CompositeByteBuf) {
17                     composite = (CompositeByteBuf) cumulation;
18                 } else {
19                     composite = alloc.compositeBuffer(Integer.MAX_VALUE);
20                     composite.addComponent(true, cumulation);
21                 }
22                 composite.addComponent(true, in);
23                 buffer = composite;
24             }
25             return buffer;
26         }






  1. 把Byte流資料追加到cumulation中。
  2. 呼叫decode方法從cumulation中分離出完整的資料包,並把資料包反序列化成特定型別的資料,直到不能分離資料包為止。
  3. 檢查cumulation,如果沒有剩餘資料,就銷燬掉這個cumulation。否則,增加讀計數。如果讀計數超過丟棄閾值,丟掉部分資料,這一步是為了防止cumulation中堆積的資料過多。
  4. 把反序列化得到的Message List傳遞到pipeline中的下一個ChannelInboundHandler處理。

  由於使用了cumulation,ByteToMessageDecoder就變成了一個有狀態的ChannelHandler, 它必須是獨佔的,不能使用ChannelHandler.@Sharable註解。

  在channelRead中,並沒有直接呼叫decode方法,而是通過callDecode間接呼叫。而callDecdoe也不是直接呼叫,而是呼叫了decodeRemovalReentryProtection方法,這個方法只是對decode呼叫的簡單封裝。引數in是堆積緩衝區cumulation。 這個方法主要實現上面描述的第2個步驟。

 1 //在channelRead中呼叫方式:callDecode(ctx, cumulation, out);    
 2 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
 3         try {
 4             while (in.isReadable()) {
 5                 int outSize = out.size();
 7                 if (outSize > 0) {
 8                     fireChannelRead(ctx, out, outSize);
 9                     out.clear();
11                     // Check if this handler was removed before continuing with decoding.
12                     // If it was removed, it is not safe to continue to operate on the buffer.
13                     //
14                     // See:
15                     // - https://github.com/netty/netty/issues/4635
16                     if (ctx.isRemoved()) {
17                         break;
18                     }
19                     outSize = 0;
20                 }
22                 int oldInputLength = in.readableBytes();
23                 decodeRemovalReentryProtection(ctx, in, out);
25                 // Check if this handler was removed before continuing the loop.
26                 // If it was removed, it is not safe to continue to operate on the buffer.
27                 //
28                 // See https://github.com/netty/netty/issues/1664
29                 if (ctx.isRemoved()) {
30                     break;
31                 }
33                 if (outSize == out.size()) {
34                     if (oldInputLength == in.readableBytes()) {
35                         break;
36                     } else {
37                         continue;
38                     }
39                 }
41                 if (oldInputLength == in.readableBytes()) {
42                     throw new DecoderException(
43                             StringUtil.simpleClassName(getClass()) +
44                                     ".decode() did not read anything but decoded a message.");
45                 }
47                 if (isSingleDecode()) {
48                     break;
49                 }
50             }
51         } catch (DecoderException e) {
52             throw e;
53         } catch (Exception cause) {
54             throw new DecoderException(cause);
55         }
56     }



  33,39行,如果outSize == out.size()(沒有反序列化到新的Message), 且oldInputLength == in.readableBytes()(in中的資料長度沒有變化)表示in中的資料不足以完成一次反序列化操作,跳出迴圈。否則,繼續。



 同時可以進行序列化和反序列化的抽象類: ByteToMessageCodec<I>




型別轉換編碼的抽象實現: MessageToMessageEncoder<I>

  這個類是ChannelOutboundHandlerAdapter的派生類,它在功能是在write過程中,把 I 型別的資料轉換成另一種型別的資料。它定義了抽象方法encode,有子類負責實現具體的轉換操作。

 1     @Override
 2     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
 3         CodecOutputList out = null;
 4         try {
 5             if (acceptOutboundMessage(msg)) {
 6                 out = CodecOutputList.newInstance();
 7                 @SuppressWarnings("unchecked")
 8                 I cast = (I) msg;
 9                 try {
10                     encode(ctx, cast, out);
11                 } finally {
12                     ReferenceCountUtil.release(cast);
13                 }
15                 if (out.isEmpty()) {
16                     out.recycle();
17                     out = null;
19                     throw new EncoderException(
20                             StringUtil.simpleClassName(this) + " must produce at least one message.");
21                 }
22             } else {
23                 ctx.write(msg, promise);
24             }
25         } catch (EncoderException e) {
26             throw e;
27         } catch (Throwable t) {
28             throw new EncoderException(t);
29         } finally {
30             if (out != null) {
31                 final int sizeMinusOne = out.size() - 1;
32                 if (sizeMinusOne == 0) {
33                     ctx.write(out.get(0), promise);
34                 } else if (sizeMinusOne > 0) {
35                     // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
36                     // See https://github.com/netty/netty/issues/2525
37                     ChannelPromise voidPromise = ctx.voidPromise();
38                     boolean isVoidPromise = promise == voidPromise;
39                     for (int i = 0; i < sizeMinusOne; i ++) {
40                         ChannelPromise p;
41                         if (isVoidPromise) {
42                             p = voidPromise;
43                         } else {
44                             p = ctx.newPromise();
45                         }
46                         ctx.write(out.getUnsafe(i), p);
47                     }
48                     ctx.write(out.getUnsafe(sizeMinusOne), promise);
49                 }
50                 out.recycle();
51             }
52         }
53     }

  6-12行,如果msg是 I 型別的資料,呼叫encode把它轉換成另一種型別。


  23行, 如果msg不是 I 型別,跳過當前的Handler。

  31-50, 如果轉換成功,把轉換後的資料傳到到下一個Handler處理。33行處理只有一個轉換結果的情況。37-48行處理有多個轉換結果的情況。


型別轉換解碼的抽象實現: MessageToMessageDecoder<I>

  這個類是ChannelInboundHandlerAdapter的派生類,它的功能是在read的過程中,把 I 型別的資料轉換成另一種型別的資料。它定義了抽象方法decode,有子類負責實現具體的轉換操作。它的channelRead和上面的類實現相似,但更簡單,這裡就不再分析原始碼了。


型別轉換編解碼的抽象實現: MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
