netty解碼器之ReplayingDecoder
阿新 • • 發佈:2020-12-31
技術標籤:netty
解碼器之ReplayingDecoder
ReplayingDecoder繼承了ByteToMessageDecoder,是一個解碼器,是ByteToMessageDecoder的一個特殊變體。
ReplayingDecoder和ByteToMessageDecoder的區別
ReplayingDecoder和ByteToMessageDecoder最大的不同在於,ReplayingDecoder允許讓你實現decode()方法,就像已經接收到所有所需的位元組,而不用去檢查所需位元組的可用性。
假設現在自定義協議傳遞ByteBuf格式如下:
協議傳遞的資料需要封裝為如下格式:
import lombok.Data;
@Data
public class CustomProtocol {
private int length;
private byte[] content;
}
使用ByteToMessageDecoder處理粘包半包問題
package com.morris.netty.code.replayingdecoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec. ByteToMessageDecoder;
import java.util.List;
public class CustomProtocolDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes() < 4) { // length is ready?
return ;
}
in.markReaderIndex();
int length = in.readInt();
if(in.readableBytes() < length) { // content is ready?
in.resetReaderIndex();
return;
}
CustomProtocol customProtocol = new CustomProtocol();
customProtocol.setLength(length);
byte[] bytes = new byte[length];
in.readBytes(bytes);
customProtocol.setContent(bytes);
out.add(customProtocol);
}
}
使用ReplayingDecoder處理粘包半包問題
package com.morris.netty.code.replayingdecoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int length = in.readInt();
CustomProtocol customProtocol = new CustomProtocol();
customProtocol.setLength(length);
byte[] bytes = new byte[length];
in.readBytes(bytes);
customProtocol.setContent(bytes);
out.add(customProtocol);
}
}
ReplayingDecoder底層如何實現的
ReplayingDecoder通過一個專門的ByteBuf實現,當緩衝區中沒有足夠的資料時,會丟擲一個某種型別的Error。在上面的IntegerHeaderFrameDecoder中,當呼叫in.readInt()時,只需要假設緩衝區有4個或更多的位元組。如果,緩衝區中確實有4個位元組,會返回你所期望的integer header。否則,會丟擲這個Error。ReplayingDecoder會捕獲這個Error,然後重置緩衝區的readerindex到緩衝區的開始位置,並且當更多的資料到達緩衝區時,會再次呼叫decode()方法。
需要注意一些限制:
- buffer的部分操作,如readBytes(ByteBuffer dst)、retain()、release()等方法會直接丟擲異常。
- 如果網路緩慢而且訊息格式複雜,效能會變差,在這種情況,你的解碼器不得不解碼同一部分的資訊。
- 解碼一條資訊,decode()方法可以被多次呼叫,如下程式碼不能正常工作:
package com.morris.netty.code.replayingdecoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<>();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// This assertion will fail intermittently since values.offer()
// can be called more than two times!
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
正確的做法如下:
package com.morris.netty.code.replayingdecoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<>();
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
values.clear(); // 沒有這一行就會有問題
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// This assertion will fail intermittently since values.offer()
// can be called more than two times!
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
提升ReplayingDecoder的效能
複雜解碼器的效能可以使用checkpoint()方法得到顯著的提高,checkpoint()方法更新緩衝區的“初始”位置,ReplayingDecoder將回滾緩衝區的readerIndex到最後呼叫checkpoint()方法的位置。
帶列舉引數的checkpoint()
管理解碼器狀態的最簡單方法是建立表示解碼器當前狀態的Enum型別,並在狀態改變時呼叫checkpoint(T)方法,你可以有許多你想要的狀態取決於你想解碼的資訊的複雜性:
package com.morris.netty.code.replayingdecoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class IntegerHeaderFrameDecoder2 extends ReplayingDecoder<MyDecoderState> {
private int length;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
無參的checkpoint()
也可以自己管理緩衝器的狀態,不適用列舉型別。
package com.morris.netty.code.replayingdecoder;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
public class IntegerHeaderFrameDecoder3 extends ReplayingDecoder<MyDecoderState> {
private boolean readLength; // 通過這個欄位來標記當前的狀態
private int length;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
if (!readLength) {
length = buf.readInt();
readLength = true;
checkpoint();
} else {
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
checkpoint();
}
}
}