1. 程式人生 > >Netty之ByteToMessageDecoder

Netty之ByteToMessageDecoder

Netty中所有的解碼器都是基於ByteToMessageDecoder來實現的,他的實現原理如下: 1、累加位元組流 2、呼叫子類的decode方法進行解析 3、將解析得到ByteBuf向下傳播

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//如果當前傳進來的物件是ByteBuf型別的,那麼就直接丟給解碼器進行處理,否則向下傳播
        if (msg instanceof ByteBuf) {
        	//例項化一個list
            CodecOutputList out = CodecOutputList.newInstance();
            try {
            	//將物件強制轉換成bytebuf
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                //如果是null,說明是剛開始解析,直接把二進位制字串流賦值
                if (first) {
                    cumulation = data;
                } else {
                	//否則將二進位制字串流進行累加
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                //呼叫解碼器進行累加
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                //將解析出來的物件的集合向下傳播
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

這個方法裡面總共做了這麼幾件事情: 1、將傳進來的二進位制字串流進行累加 2、呼叫解碼器解碼累加後的二進位制字串 3、向下傳播解析出來的物件 下面我們分別看一下每個步驟是怎麼來實現的:

public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            ByteBuf buffer;
            //檢查記憶體空間大小是否足夠,來選擇是否擴容
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1) 
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            //將新的到的二進位制字串流寫進ByteBuf進行累加並返回
            buffer.writeBytes(in);
            in.release();
            return buffer;
        }

這個方法的邏輯就是先判斷當前的額ByteBuf的記憶體大小是否足夠來選擇是否進行擴容,之後會將新讀進來的資料和原資料進行累加。

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
        	//當前的ByteBuf是否可讀
            while (in.isReadable()) {
            	//得到原始解析出來的物件的數目
                int outSize = out.size();
				//如果size不等於零,說明有解析出來的物件,將解析出來的物件向下傳播
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    //清空list
                    out.clear();
					//如果在解析過程中當前的context被移除了,就停止解析
                    if (ctx.isRemoved()) {
                        break;
                    }
                    //重置size
                    outSize = 0;
                }
				//得到可以解析的資料的長度記錄下來
                int oldInputLength = in.readableBytes();
                //呼叫子類的解析方法進行解析
                decode(ctx, in, out);

                // 如果在解析過程中當前的context被移除了,就停止解析
                if (ctx.isRemoved()) {
                    break;
                }
				//如果list的size沒有改變,說明沒有解析出來物件
                if (outSize == out.size()) {
                	//這一步代表著沒有進行解析,說明讀進來的資料還不夠,需要繼續讀一點
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }
				//當已經解析出來物件了,但是沒有讀資料,要拋異常
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }
				//如果讀一次資料只解析一次,也會break
                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Throwable cause) {
            throw new DecoderException(cause);
        }
    }

這個方法邏輯是這樣的:檢查儲存解析出來的物件的list是否為空來判斷是否需要向下傳播解析出來的物件,呼叫子類的解析方法,然後對解析結束後,是否解析出來物件,是否進行了讀來做相應的處理。

static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
        for (int i = 0; i < numElements; i ++) {
            ctx.fireChannelRead(msgs.getUnsafe(i));
        }
    }

將解析出來的物件一個個的向下傳播。