netty原始碼分析(二十二)Netty編解碼器剖析與入站出站處理器詳解
Netty處理器重要概念:
1、Netty的處理器可以分為兩類:入棧處理器和出棧處理器。
2、入棧處理器的頂層是ChannelInboundHandler,出棧處理器的頂層是ChannelOutboundHandler。
3、資料處理時常用的各種編解碼器本質上都是處理器。
4、編解碼器:無論我們是向網路中寫入資料是什麼型別(int、char、String、二進位制等),資料在網路中傳遞時,其都是以位元組流的形式出現的;將資料由原本的形式轉換為位元組流的操作稱為編碼(encode),將資料由位元組轉換為它原本的格式或是其他格式的操作稱為解碼(decode),編碼統一稱為codec。
5、編碼:本質上是一種出棧處理器;因此,編碼一定是一種ChannelOutboundHandler。
6、解碼:本質上是一種入棧處理器,因此。解碼一定是一種ChannelInboundHandler。
7、在Netty中,編碼器通常以XXXEncoder命名;解碼器通常以XXXDecoder命名。
netty下邊有很多編解碼器的實現:
實際開發的過程中我們可以去使用它們,我們要講的不是去使用它們,現在以一個例子來說明編解碼的一些內幕:
netty提供了一個位元組到訊息的轉換器(ByteToMessageDecoder):
接下來我們使用ByteToMessageDecoder自己實現一個解碼器:
public class MyByteToLongDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("decode invoked");
System.out.println(in.readableBytes());
if(in.readableBytes()>=8){
out.add(in.readLong());
}
}
}
然後需要一個將Long型別的資料轉換為byte的書裝換器:
使用MessageToByteEncoder:
服務端:
/**
* Created by Administrator on 2017/5/20.
* 伺服器和客戶端互發程式
*/
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new MyServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服務端初始化:
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipline = ch.pipeline();
pipline.addLast(new MyByteToLongDecoder());
pipline.addLast(new MyLongToByteEncoder());
pipline.addLast(new MyServerHandler());
}
}
服務端handler:
public class MyServerHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+" --> "+msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客戶端:
public class Myclient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyClientIniatializer());
ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();
channelFuture.channel().writeAndFlush("hello");
channelFuture.channel().closeFuture().sync();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}
客戶端的initializer:
public class MyClientIniatializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipline = ch.pipeline();
pipline.addLast(new MyByteToLongDecoder());
pipline.addLast(new MyLongToByteEncoder());
pipline.addLast(new MyClientHandler());
}
}
客戶端handler:
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println(ctx.channel().remoteAddress());
System.out.println("client output "+msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(123456L);//一定要加L,否則會作為int型別處理,最終導致訊息傳送不出去。
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
位元組到Long型別的解碼器(解析網路傳過來的資料):
public class MyByteToLongDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("decode invoked");
System.out.println(in.readableBytes());
if(in.readableBytes()>=8){
out.add(in.readLong());
}
}
}
Long型別轉換為位元組(傳送到網路之前的轉換):
public class MyLongToByteEncoder extends MessageToByteEncoder<Long> {
@Override
protected void encode(ChannelHandlerContext ctx, Long msg, ByteBuf out) throws Exception {
System.out.println("encode invoked");
System.out.println(msg);
out.writeLong(msg);
}
}
執行服務端,之後執行客戶端,列印輸出:
server端輸出:
decode invoked
8
/127.0.0.1:4679 --> 123456
client輸出:
encode invoked
123456
至於為什麼是這樣一個過程:
首先客戶端啟動之後,呼叫MyLongToByteEncoder的encode方法列印“encode invoked”和傳送的資料“123456”。
服務段接受到之後呼叫MyByteToLongDecoder的decode列印“decode invoked”和資料長度“8”,之後是呼叫MyClientHandler的channelRead0列印“/127.0.0.1:4679 –> 123456”
接下來我們修改MyClientHandler如下:
public class MyClientHandler extends SimpleChannelInboundHandler<Long> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Long msg) throws Exception {
System.out.println(ctx.channel().remoteAddress());
System.out.println("client output "+msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(123456L);
/*
都可以傳送
ctx.writeAndFlush(1L);
ctx.writeAndFlush(2L);
ctx.writeAndFlush(3L);
ctx.writeAndFlush(4L);*/
ctx.writeAndFlush(Unpooled.copiedBuffer("helloworld", Charset.forName("utf-8")));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
ctx.close();
}
}
ctx.writeAndFlush(Unpooled.copiedBuffer(“helloworld”, Charset.forName(“utf-8”)));
這行程式碼,即傳送一個Buffer,我們檢視控制檯列印的資料:
server端的列印:
decode invoked
10
/127.0.0.1:6394 --> 7522537965574647666
decode invoked
2
客戶端沒有任何輸出,因為我們的客戶端使用的是ByteBuff,而客戶端的編碼器是Long型別的,所以編碼器沒有執行,直接把資料丟給了Socket傳輸到了網路,所以服務端會收到資料,我們傳送的資料是“helloworld”由於是utf-8,所以一個英文字元是一個位元組,一共是10個位元組,我們解碼器只有在大於8個位元組的時候才會對其進行解碼然後給到下一個處理器,所以10個位元組前8個通過了解碼器,去了下一個handler,而剩下的2個沒有通過解碼器,服務端列印的“/127.0.0.1:6394 –> 7522537965574647666”後邊的那串數字是8個位元組的資料。
關於netty編解碼器的重要結論:
1、無論是編碼器還是解碼器,其接受的訊息型別必須要與待處理的引數型別一致,否則該編碼器或解碼器並不會執行。
2、在解碼器進行資料解碼時,一定要記得判斷緩衝(ByteBuf)中的資料是否 足夠,否則將會產生一些問題。
例如上邊的例子判斷是否是8個長度(因為long是佔用8個位元組的資料型別):
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
System.out.println("decode invoked");
System.out.println(in.readableBytes());
if(in.readableBytes()>=8){
out.add(in.readLong());
}
}