netty 二之pipeline
一 netty簡單示例
先寫一個簡單的netty長連線Demo
服務端主要包括連線類(bootstrap)和業務處理類(channelHandler),另外一個server啟動類,可以與連線類合併。
公用的類為訊息和訊息編碼,訊息解碼類。
訊息類:
public class RequestInfoVO { private String body; private int Type; private int Sequence; public String getBody() { return body; } public void setBody(String body) { this.body = body; } public int getType() { return Type; } public void setType(int type) { Type = type; } public int getSequence() { return Sequence; } public void setSequence(int sequence) { Sequence = sequence; } }
訊息解碼器:
public class MessageDecoder extends ByteToMessageDecoder { private static final int MAGIC_NUMBER = 0x0CAFFEE0; public MessageDecoder() { } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 14) { return; } // 標記開始讀取位置 in.markReaderIndex(); int magic_number = in.readInt(); if (MAGIC_NUMBER != magic_number) { ctx.close(); return; } @SuppressWarnings("unused") byte version = in.readByte(); byte type = in.readByte(); int squence = in.readInt(); int length = in.readInt(); if (length < 0) { ctx.close(); return; } if (in.readableBytes() < length) { // 重置到開始讀取位置 in.resetReaderIndex(); return; } byte[] body = new byte[length]; in.readBytes(body); RequestInfoVO req = new RequestInfoVO(); req.setBody(new String(body, "utf-8")); req.setType(type); req.setSequence(squence); out.add(req); } }
訊息編碼器:
public class MessageEncoder extends MessageToByteEncoder<RequestInfoVO> { private static final String DEFAULT_ENCODE = "utf-8"; private static final int MAGIC_NUMBER = 0x0CAFFEE0; public MessageEncoder() { } @Override protected void encode(ChannelHandlerContext ctx, RequestInfoVO msg, ByteBuf out) throws Exception { @SuppressWarnings("resource") ByteBufOutputStream writer = new ByteBufOutputStream(out); byte[] body = null; if (null != msg && null != msg.getBody() && "" != msg.getBody()) { body = msg.getBody().getBytes(DEFAULT_ENCODE); } writer.writeInt(MAGIC_NUMBER); writer.writeByte(1); writer.writeByte(msg.getType()); writer.writeInt(msg.getSequence()); if (null == body || 0 == body.length) { writer.writeInt(0); } else { writer.writeInt(body.length); writer.write(body); } } }
服務端連線類:
public class NettyServerBootstrap {
private Integer port;
private SocketChannel socketChannel;
public NettyServerBootstrap(Integer port) throws Exception {
this.port = port;
bind(port);
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public SocketChannel getSocketChannel() {
return socketChannel;
}
public void setSocketChannel(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
private void bind(int serverPort) throws Exception {
// 連線處理group
EventLoopGroup boss = new NioEventLoopGroup();
// 事件處理group
EventLoopGroup worker = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
// 繫結處理group
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);
// 保持連線數
bootstrap.option(ChannelOption.SO_BACKLOG, 1024 * 1024);
// 有資料立即傳送
bootstrap.option(ChannelOption.TCP_NODELAY, true);
// 保持連線
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// 處理新連線
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 增加任務處理
ChannelPipeline p = sc.pipeline();
/**
* 這裡要注意的地方:
* 1.順序是要注意的,解碼器必須排在入境處理類之前,因為入境資料的流動時從做到右;
* 編碼器必須放在出境處理類之前,因為出境資料的流動時從右到左
* 2.同時配置幾個解碼器或者同時配置幾個編碼器都很容易出現問題,一般只配置一個解碼器和一個編碼器
*/
p.addLast(new MessageDecoder(), new MessageEncoder(), new NettyServerHandler());
}
});
ChannelFuture f = bootstrap.bind(serverPort).sync();
if (f.isSuccess()) {
System.out.println("long connection started success");
} else {
System.out.println("long connection started fail");
}
}
服務端業務處理handler:
public class NettyServerHandler extends SimpleChannelInboundHandler<RequestInfoVO> {
// private static final Log log = LogFactory.getLog(NettyServerHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestInfoVO msg) throws Exception {
System.out.println(msg.getBody());
//
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
}
}
客戶端:
客戶端與服務端共用訊息類和解碼器,編碼器。
客戶端連線類:
public class NettyClientBootstrap {
private int port;
private String host;
private SocketChannel socketChannel;
public NettyClientBootstrap(int port, String host) throws Exception {
this.host = host;
this.port = port;
start();
}
private void start() throws Exception {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.group(eventLoopGroup);
bootstrap.remoteAddress(this.host, this.port);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MessageDecoder(), new MessageEncoder(), new NettyClientHandler());
}
});
ChannelFuture future = bootstrap.connect(this.host, this.port).sync();
if (future.isSuccess()) {
socketChannel = (SocketChannel) future.channel();
System.out.println("connect server success|");
}
}
public int getPort() {
return this.port;
}
public void setPort(int port) {
this.port = port;
}
public SocketChannel getSocketChannel() {
return socketChannel;
}
public void setSocketChannel(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
}
客戶端業務處理handler:
public class NettyClientHandler extends SimpleChannelInboundHandler<RequestInfoVO> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestInfoVO msg)
throws Exception {
System.out.println(msg.getBody());
RequestInfoVO req = new RequestInfoVO();
req.setSequence(msg.getSequence());
req.setType(msg.getType());
if (2 == msg.getType()) {
req.setBody("client");
ctx.channel().writeAndFlush(req);
} else if (3 == msg.getType()) {
req.setBody("zpksb");
ctx.channel().writeAndFlush(req);
}
}
}
最後是啟動類:
服務端:
public class Server {
public static void main(String[] args) {
try {
new NettyServerBootstrap(9999);
} catch (Exception e) {
e.printStackTrace();
}
}
}
客戶端:
public class Client {
public static void main(String[] args) throws Exception {
NettyClientBootstrap bootstrap = new NettyClientBootstrap(9999, "127.0.0.1");
int i = 1;
while (true) {
TimeUnit.SECONDS.sleep(2);
System.out.println("send heartbeat!");
RequestInfoVO req = new RequestInfoVO();
req.setSequence(123456);
req.setType((byte) 1);
req.setSequence(0);
req.setBody(String.valueOf((new Date()).getTime()));
TestVO vo = new TestVO();
vo.setReq(1);
vo.setName("582552252525");
// bootstrap.getSocketChannel().write(req);
bootstrap.getSocketChannel().writeAndFlush(vo);
i++;
}
}
}
上面程式碼的主要功能為一個實現長連線,同時客戶端定時會向服務端傳送心跳包類檢測連線是否正常。
有幾個需要注意的地方:
1.伺服器的連線類中有一行關鍵的程式碼:
p.addLast(new MessageDecoder(), new MessageEncoder(), new NettyServerHandler());
這一行程式碼是一行關鍵的程式碼,理解了這一行程式碼,對netty的整個工作流程的理解非常有幫助:
解碼器MessageDecoder,編碼器MessageEncoder,業務處理NettyServerHandler三個都是ChannelHandler,那麼一條訊息傳送到服務端以後是如何處理的一個流程呢,先要了解一下netty的訊息處理機制
對於一條訊息,netty是通過pipeline事件傳遞的方式處理。
netty是採用雙向連結串列的方式來處理訊息事件,用到了責任鏈模式。還有一個概念,就是訊息也有分為:入境(inbound)訊息和出境(outbound)訊息,而handler則也分為入境(inbound)處理handler和出境(outbound)處理handler。
處理過程為:
一個入境訊息是從左到右(header開始向後遍歷),依次被每個為入境型別handler處理(處理過程中的handler也可以選擇終止訊息傳播,直接返回)。最後一個handler處理後返回。即一條入境訊息沿著handler鏈條從左到右處理。
一個出境訊息是從右到左(tail開始向前遍歷),依次被每個為出境型別handler處理(處理過程中的handler也可以選擇終止訊息傳播,直接返回)。最後一個handler處理後返回。一條出境訊息沿著handler鏈條從右到左處理。
再來看上面的情況:
上面的三個handler組成一條鏈條:MessageDecoder->MessageEncoder->NettyServerHandler
其中兩個入境handler:MessageDecoder和NettyServerHandler,一個出境handler:MessageEncoder
一個客戶端傳送一個訊息過來,對於服務端來說這是一個入境事件,所以先後會經過MessageDecoder和NettyServerHandler處理,MessageDecoder先將網路傳送過來的位元組陣列進行解碼(反序列化)成java物件。然後交給業務處理類NettyServerHandler處理
現在如果假設把順序替換一下。成下面這樣
p.addLast(new NettyServerHandler(),new MessageDecoder(), new MessageEncoder());
然業務處理類NettyServerHandler在解碼類MessageDecoder前面,會發現什麼狀況?
這是訊息首先經過NettyServerHandler,但是這個時候訊息並不是物件形式的,而是位元組陣列形式的,所以NettyServerHandler並不處理訊息,而是將其傳播到解碼器MessageDecoder。這時候入境訊息就沒有被處理。
可見在載入handler鏈條的時候,要確保解碼器類在入境業務處理類的前面(入境訊息從左到右處理);同理編碼器要放在出境業務處理類的前面(出境訊息從右到左處理)。
還有一點,業務處理類NettyServerHandler中有RequestInfoVO的泛型,這個表示該業務處理類只會攔截解碼後為RequestInfoVO型別的物件進行處理,如果解碼後為其他型別的物件,則不處理,向下一個入境handler傳播訊息。
多個解碼器的情況
上面是一個RequestInfoVO訊息的情況,其可以處理查詢使用者資訊請求,假設使用者現在需要退出登入,有兩種方式:
1.將RequestInfoVO擴充套件,讓其也可以表示退出登陸訊息
這個時候只需要對解碼器進行相應的修改,讓其處理擴充套件後的物件
2.新建一個訊息類RequetLogoutVO類,然其表示退出登陸訊息
這種情況下,需要新增一個解碼器MessageDecoder2,然解碼器來處理RequetLogoutVO型別的訊息
這就引申出了一個問題,當存在多個解碼器的時候流程如何處理?
假設現在新增了一個解碼器,變為:
p.addLast(new MessageDecoder(),new MessageDecoder2(),new MessageEncoder(),new NettyServerHandler());
現在RequetLogoutVO類位元組陣列訊息傳送到服務端後:
先經過MessageDecoder解碼,這個時候訊息變為RequetLogoutVO類JAVA物件,其再經過解碼器MessageDecoder2,這個時候解碼器會對JAVA物件RequetLogoutVO進行再一次的解碼,變為一個為止的訊息,這個訊息到達處理類NettyServerHandler已經變為亂碼,無法正常處理。
所以在一個系統中,只定義一個解碼器和一個編碼器,要處理多種不同型別的訊息是通過擴充套件訊息類來實現,即一個通用的訊息類可以表示各種型別的業務處理訊息。
綜合上面的分析,總結一下:
1.pipeline載入handler鏈的順序有要求,解碼器要在入境業務處理類前面;編碼器要在出境業務處理類前面;
2.業務處理類攔截的是解碼後為其類泛型中定義的java物件,解碼不為其泛型定義的java物件的,其不處理,向後傳播訊息;
3.一個系統中只定義一個解碼器和一個編碼器,定義多個解碼器和多個編碼器會產生不可預知的錯誤。