netty自定義協議解碼
繼承ByteToMessageDecoder類複寫decode方法,專案中的一段解碼規則如下:
1、服務端接收報文分為兩種型別,單幀包:head為20位元組,多幀包:head為24位元組。位元組位置:5
2、表示報文體長度欄位為2個位元組,位元組開始位置:18
3、先讀取一次buffer(快取區),檢視長度是否大於20,小於20指標回滾不作處理,等待下次讀取,如果
大於20,取出幀型別欄位判斷是否為多幀包,再取出報文體長度,如果為多幀包,則包體長度加4,最後
擷取包頭加包體的長度報文,返回完整的一個包;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
public class MsgPackDecode extends ByteToMessageDecoder{
private static int FRAMETYPE_OFFSET = 5;//幀型別位元組位置
private static int HEAD_LENGHT = 20;//單幀包頭部長度 如果為多幀包為24
private static int bodyStartPos = 18;//包體長度標識起始位置
private static int LENGHT_OFFSET = 2;//包體長度標識長度
private static ByteBuf buf = Unpooled.buffer();//建立一個ByteBuf快取區
private static java.util.concurrent.atomic.AtomicInteger c = new AtomicInteger(1);
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) throws Exception {
Object o = decode(ctx, in);
if (o != null) {
out.add(o);
System.err.println(c.getAndIncrement());
}
}
//自定義協議解碼
private Object decode(ChannelHandlerContext ctx, ByteBuf in) {
//標記讀指標位置,以便可以回滾指標
in.markReaderIndex();
//如果讀取的包長度不夠head_length,回滾指標不做處理,等待下個包再解析
if(in.readableBytes() < HEAD_LENGHT){
in.resetReaderIndex();
return null;
} else {
//讀取包頭中幀型別(0:單幀包1:多幀包)資訊
in.readBytes(buf, HEAD_LENGHT);
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
//判斷幀型別(0:單幀包1:多幀包)
int frameType = (req[FRAMETYPE_OFFSET] & 0x04) == 0x04 ? 1 : 0;//獲取幀型別
int bodylenght = byteToint(subBytes(req,bodyStartPos,LENGHT_OFFSET));//獲取體長度
if(frameType==0){
//單幀包
// 如發現剩餘位元組不夠包體長度,回滾指標,等待下次解碼
if (in.readableBytes() < bodylenght) {
in.resetReaderIndex();
return null;
}
in.readBytes(buf, bodylenght);
ByteBuf frame = ctx.alloc().buffer(HEAD_LENGHT + bodylenght);
frame.writeBytes(buf, 0, HEAD_LENGHT + bodylenght);
buf.clear();
return frame;
}else {
//多幀包
// 如發現剩餘位元組不夠包體長度,回滾指標,等待下次解碼
if (in.readableBytes() < bodylenght+4) {
in.resetReaderIndex();
return null;
}
in.readBytes(buf, bodylenght+4);
ByteBuf frame = ctx.alloc().buffer(HEAD_LENGHT + bodylenght+4);
frame.writeBytes(buf, 0, HEAD_LENGHT + bodylenght+4);
buf.clear();
return frame;
}
}
}
/**
* 從一個byte[]陣列中擷取一部分
* @param src
* @param begin
* @param count
* @return
*/
public static byte[] subBytes(byte[] req, int begin, int count) {
byte[] bs = new byte[count];
for (int i=begin; i<begin+count; i++) bs[i-begin] = req[i];
return bs;
}
/**
* 位元組轉int
* @param src
* @param begin
* @param count
* @return
*/
public static int byteToint(byte[] res) {
// 一個byte資料左移24位變成0x??000000,再右移8位變成0x00??0000
int targets = (res[0] << 8 & 0xFF00) | (res[1] & 0xFF);
return targets;
}
}
這種方式是為了解決自定義協議tcp粘包的問題。