Netty之ByteToMessageDecoder
阿新 • • 發佈:2018-12-12
Netty中所有的解碼器都是基於ByteToMessageDecoder來實現的,他的實現原理如下: 1、累加位元組流 2、呼叫子類的decode方法進行解析 3、將解析得到ByteBuf向下傳播
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //如果當前傳進來的物件是ByteBuf型別的,那麼就直接丟給解碼器進行處理,否則向下傳播 if (msg instanceof ByteBuf) { //例項化一個list CodecOutputList out = CodecOutputList.newInstance(); try { //將物件強制轉換成bytebuf ByteBuf data = (ByteBuf) msg; first = cumulation == null; //如果是null,說明是剛開始解析,直接把二進位制字串流賦值 if (first) { cumulation = data; } else { //否則將二進位制字串流進行累加 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //呼叫解碼器進行累加 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); //將解析出來的物件的集合向下傳播 fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } }
這個方法裡面總共做了這麼幾件事情: 1、將傳進來的二進位制字串流進行累加 2、呼叫解碼器解碼累加後的二進位制字串 3、向下傳播解析出來的物件 下面我們分別看一下每個步驟是怎麼來實現的:
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; //檢查記憶體空間大小是否足夠,來選擇是否擴容 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1) buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } //將新的到的二進位制字串流寫進ByteBuf進行累加並返回 buffer.writeBytes(in); in.release(); return buffer; }
這個方法的邏輯就是先判斷當前的額ByteBuf的記憶體大小是否足夠來選擇是否進行擴容,之後會將新讀進來的資料和原資料進行累加。
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { //當前的ByteBuf是否可讀 while (in.isReadable()) { //得到原始解析出來的物件的數目 int outSize = out.size(); //如果size不等於零,說明有解析出來的物件,將解析出來的物件向下傳播 if (outSize > 0) { fireChannelRead(ctx, out, outSize); //清空list out.clear(); //如果在解析過程中當前的context被移除了,就停止解析 if (ctx.isRemoved()) { break; } //重置size outSize = 0; } //得到可以解析的資料的長度記錄下來 int oldInputLength = in.readableBytes(); //呼叫子類的解析方法進行解析 decode(ctx, in, out); // 如果在解析過程中當前的context被移除了,就停止解析 if (ctx.isRemoved()) { break; } //如果list的size沒有改變,說明沒有解析出來物件 if (outSize == out.size()) { //這一步代表著沒有進行解析,說明讀進來的資料還不夠,需要繼續讀一點 if (oldInputLength == in.readableBytes()) { break; } else { continue; } } //當已經解析出來物件了,但是沒有讀資料,要拋異常 if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } //如果讀一次資料只解析一次,也會break if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } }
這個方法邏輯是這樣的:檢查儲存解析出來的物件的list是否為空來判斷是否需要向下傳播解析出來的物件,呼叫子類的解析方法,然後對解析結束後,是否解析出來物件,是否進行了讀來做相應的處理。
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
將解析出來的物件一個個的向下傳播。