1. 程式人生 > 其它 >netty解碼器之ReplayingDecoder

netty解碼器之ReplayingDecoder

技術標籤:netty

解碼器之ReplayingDecoder

ReplayingDecoder繼承了ByteToMessageDecoder,是一個解碼器,是ByteToMessageDecoder的一個特殊變體。

ReplayingDecoder和ByteToMessageDecoder的區別

ReplayingDecoder和ByteToMessageDecoder最大的不同在於,ReplayingDecoder允許讓你實現decode()方法,就像已經接收到所有所需的位元組,而不用去檢查所需位元組的可用性。

假設現在自定義協議傳遞ByteBuf格式如下:

在這裡插入圖片描述

協議傳遞的資料需要封裝為如下格式:

import lombok.Data;

@Data
public class CustomProtocol {

    private int length;
    private byte[] content;
}

使用ByteToMessageDecoder處理粘包半包問題

package com.morris.netty.code.replayingdecoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.
ByteToMessageDecoder; import java.util.List; public class CustomProtocolDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if(in.readableBytes() < 4) { // length is ready? return
; } in.markReaderIndex(); int length = in.readInt(); if(in.readableBytes() < length) { // content is ready? in.resetReaderIndex(); return; } CustomProtocol customProtocol = new CustomProtocol(); customProtocol.setLength(length); byte[] bytes = new byte[length]; in.readBytes(bytes); customProtocol.setContent(bytes); out.add(customProtocol); } }

使用ReplayingDecoder處理粘包半包問題

package com.morris.netty.code.replayingdecoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {

        int length = in.readInt();

        CustomProtocol customProtocol = new CustomProtocol();
        customProtocol.setLength(length);
        byte[] bytes = new byte[length];
        in.readBytes(bytes);
        customProtocol.setContent(bytes);

        out.add(customProtocol);
    }
}

ReplayingDecoder底層如何實現的

ReplayingDecoder通過一個專門的ByteBuf實現,當緩衝區中沒有足夠的資料時,會丟擲一個某種型別的Error。在上面的IntegerHeaderFrameDecoder中,當呼叫in.readInt()時,只需要假設緩衝區有4個或更多的位元組。如果,緩衝區中確實有4個位元組,會返回你所期望的integer header。否則,會丟擲這個Error。ReplayingDecoder會捕獲這個Error,然後重置緩衝區的readerindex到緩衝區的開始位置,並且當更多的資料到達緩衝區時,會再次呼叫decode()方法。

需要注意一些限制:

  • buffer的部分操作,如readBytes(ByteBuffer dst)、retain()、release()等方法會直接丟擲異常。
  • 如果網路緩慢而且訊息格式複雜,效能會變差,在這種情況,你的解碼器不得不解碼同一部分的資訊。
  • 解碼一條資訊,decode()方法可以被多次呼叫,如下程式碼不能正常工作:
package com.morris.netty.code.replayingdecoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

public class MyDecoder extends ReplayingDecoder<Void> {

    private final Queue<Integer> values = new LinkedList<>();

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
        // A message contains 2 integers.
        values.offer(buf.readInt());
        values.offer(buf.readInt());

        // This assertion will fail intermittently since values.offer()
        // can be called more than two times!
        assert values.size() == 2;
        out.add(values.poll() + values.poll());
    }
}

正確的做法如下:

package com.morris.netty.code.replayingdecoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

public class MyDecoder extends ReplayingDecoder<Void> {

    private final Queue<Integer> values = new LinkedList<>();

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {

        values.clear(); // 沒有這一行就會有問題

        // A message contains 2 integers.
        values.offer(buf.readInt());
        values.offer(buf.readInt());

        // This assertion will fail intermittently since values.offer()
        // can be called more than two times!
        assert values.size() == 2;
        out.add(values.poll() + values.poll());
    }
}

提升ReplayingDecoder的效能

複雜解碼器的效能可以使用checkpoint()方法得到顯著的提高,checkpoint()方法更新緩衝區的“初始”位置,ReplayingDecoder將回滾緩衝區的readerIndex到最後呼叫checkpoint()方法的位置。

帶列舉引數的checkpoint()

管理解碼器狀態的最簡單方法是建立表示解碼器當前狀態的Enum型別,並在狀態改變時呼叫checkpoint(T)方法,你可以有許多你想要的狀態取決於你想解碼的資訊的複雜性:

package com.morris.netty.code.replayingdecoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

public class IntegerHeaderFrameDecoder2 extends ReplayingDecoder<MyDecoderState> {

    private int length;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
        switch (state()) {
            case READ_LENGTH:
                length = buf.readInt();
                checkpoint(MyDecoderState.READ_CONTENT);
            case READ_CONTENT:
                ByteBuf frame = buf.readBytes(length);
                checkpoint(MyDecoderState.READ_LENGTH);
                out.add(frame);
                break;
            default:
                throw new Error("Shouldn't reach here.");
        }
    }
}

無參的checkpoint()

也可以自己管理緩衝器的狀態,不適用列舉型別。

package com.morris.netty.code.replayingdecoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

public class IntegerHeaderFrameDecoder3 extends ReplayingDecoder<MyDecoderState> {

    private boolean readLength; // 通過這個欄位來標記當前的狀態

    private int length;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {

        if (!readLength) {
            length = buf.readInt();
            readLength = true;
            checkpoint();
        } else {
            ByteBuf frame = buf.readBytes(length);
            checkpoint(MyDecoderState.READ_LENGTH);
            out.add(frame);
            checkpoint();
        }
    }
}