1. 程式人生 > >netty 二之pipeline

netty 二之pipeline

 

一 netty簡單示例

先寫一個簡單的netty長連線Demo

服務端主要包括連線類(bootstrap)和業務處理類(channelHandler),另外一個server啟動類,可以與連線類合併。

公用的類為訊息和訊息編碼,訊息解碼類。

訊息類:

public class RequestInfoVO {
	
	private String body;
	
	private int  Type;
	private int  Sequence;
	
	public String getBody() {
		return body;
	}
	public void setBody(String body) {
		this.body = body;
	}
	public int getType() {
		return Type;
	}
	public void setType(int type) {
		Type = type;
	}
	public int getSequence() {
		return Sequence;
	}
	public void setSequence(int sequence) {
		Sequence = sequence;
	}
}

訊息解碼器:

public class MessageDecoder extends ByteToMessageDecoder {
	private static final int MAGIC_NUMBER = 0x0CAFFEE0;
	public MessageDecoder() {
 
	}
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		if (in.readableBytes() < 14) {
			return;
		}
		// 標記開始讀取位置
		in.markReaderIndex();
 
		int magic_number = in.readInt();
 
		if (MAGIC_NUMBER != magic_number) {
			ctx.close();
			return;
		}
 
		@SuppressWarnings("unused")
		byte version = in.readByte();
 
		byte type = in.readByte();
		int squence = in.readInt();
		int length = in.readInt();
 
		if (length < 0) {
			ctx.close();
			return;
		}
 
		if (in.readableBytes() < length) {
			// 重置到開始讀取位置
			in.resetReaderIndex();
			return;
		}
 
		byte[] body = new byte[length];
		in.readBytes(body);
		RequestInfoVO req = new RequestInfoVO();
		req.setBody(new String(body, "utf-8"));
		req.setType(type);
		req.setSequence(squence);
		out.add(req);
	}
}

訊息編碼器:

public class MessageEncoder extends MessageToByteEncoder<RequestInfoVO> {
	 
	private static final String DEFAULT_ENCODE = "utf-8";
 
	private static final int MAGIC_NUMBER = 0x0CAFFEE0;
 
	public MessageEncoder() {
	}
 
	@Override
	protected void encode(ChannelHandlerContext ctx, RequestInfoVO msg, ByteBuf out) throws Exception {
 
		@SuppressWarnings("resource")
		ByteBufOutputStream writer = new ByteBufOutputStream(out);
		byte[] body = null;
 
		if (null != msg && null != msg.getBody() && "" != msg.getBody()) {
			body = msg.getBody().getBytes(DEFAULT_ENCODE);
		}
 
		writer.writeInt(MAGIC_NUMBER);
 
		writer.writeByte(1);
 
		writer.writeByte(msg.getType());
 
		writer.writeInt(msg.getSequence());
 
		if (null == body || 0 == body.length) {
			writer.writeInt(0);
		} else {
			writer.writeInt(body.length);
			writer.write(body);
		}
	}
 
}

服務端連線類:

public class NettyServerBootstrap {
	 
	private Integer port;
	private SocketChannel socketChannel;
	public NettyServerBootstrap(Integer port) throws Exception {
		this.port = port;
		bind(port);
	}
	public Integer getPort() {
		return port;
	}
	public void setPort(Integer port) {
		this.port = port;
	}
	public SocketChannel getSocketChannel() {
		return socketChannel;
	}
	public void setSocketChannel(SocketChannel socketChannel) {
		this.socketChannel = socketChannel;
	}
	private void bind(int serverPort) throws Exception {
		// 連線處理group
		EventLoopGroup boss = new NioEventLoopGroup();
		// 事件處理group
		EventLoopGroup worker = new NioEventLoopGroup();
		ServerBootstrap bootstrap = new ServerBootstrap();
		// 繫結處理group
		bootstrap.group(boss, worker);
		bootstrap.channel(NioServerSocketChannel.class);
		// 保持連線數
		bootstrap.option(ChannelOption.SO_BACKLOG, 1024 * 1024);
		// 有資料立即傳送
		bootstrap.option(ChannelOption.TCP_NODELAY, true);
		// 保持連線
		bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
       // 處理新連線
		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel sc) throws Exception {
				// 增加任務處理
				ChannelPipeline p = sc.pipeline();
				/**
				 * 這裡要注意的地方:
				 * 1.順序是要注意的,解碼器必須排在入境處理類之前,因為入境資料的流動時從做到右;
				 * 編碼器必須放在出境處理類之前,因為出境資料的流動時從右到左
				 * 2.同時配置幾個解碼器或者同時配置幾個編碼器都很容易出現問題,一般只配置一個解碼器和一個編碼器
				 */
				p.addLast(new MessageDecoder(), new MessageEncoder(), new NettyServerHandler());
			 }
		});
 
		ChannelFuture f = bootstrap.bind(serverPort).sync();
		if (f.isSuccess()) {
			System.out.println("long connection started success");
		} else {
			System.out.println("long connection started fail");
		}
	}

服務端業務處理handler:

public class NettyServerHandler extends SimpleChannelInboundHandler<RequestInfoVO> {
//	private static final Log log = LogFactory.getLog(NettyServerHandler.class);
 
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, RequestInfoVO msg) throws Exception {
		System.out.println(msg.getBody());
		//
	}
 
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		
	}
 
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		
	}
 
 
	@Override
	public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
		
	}
 
}

客戶端:

客戶端與服務端共用訊息類和解碼器,編碼器。

客戶端連線類:

public class NettyClientBootstrap {
	private int port;
	private String host;
	private SocketChannel socketChannel;
	public NettyClientBootstrap(int port, String host) throws Exception {
		this.host = host;
		this.port = port;
		start();
	}
	private void start() throws Exception {
		EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
		Bootstrap bootstrap = new Bootstrap();
		bootstrap.channel(NioSocketChannel.class);
		bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
		bootstrap.option(ChannelOption.TCP_NODELAY, true);
		bootstrap.group(eventLoopGroup);
		bootstrap.remoteAddress(this.host, this.port);
		bootstrap.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel socketChannel) throws Exception {
				socketChannel.pipeline().addLast(new MessageDecoder(), new MessageEncoder(), new NettyClientHandler());
			}
		});
		ChannelFuture future = bootstrap.connect(this.host, this.port).sync();
		if (future.isSuccess()) {
			socketChannel = (SocketChannel) future.channel();
			System.out.println("connect server  success|");
		}
	}
	public int getPort() {
		return this.port;
	}
	public void setPort(int port) {
		this.port = port;
	}
 
	public SocketChannel getSocketChannel() {
		return socketChannel;
	}
	public void setSocketChannel(SocketChannel socketChannel) {
		this.socketChannel = socketChannel;
	}
	public String getHost() {
		return host;
	}
	public void setHost(String host) {
		this.host = host;
	}
}

客戶端業務處理handler:

public class NettyClientHandler extends SimpleChannelInboundHandler<RequestInfoVO> {

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, RequestInfoVO msg)
			throws Exception {
		System.out.println(msg.getBody());
		RequestInfoVO req = new RequestInfoVO();
		req.setSequence(msg.getSequence());
		req.setType(msg.getType());
		if (2 == msg.getType()) {
			req.setBody("client");
			ctx.channel().writeAndFlush(req);
		} else if (3 == msg.getType()) {
			req.setBody("zpksb");
			ctx.channel().writeAndFlush(req);
		}
		
	}
 
}

最後是啟動類:

服務端:

public class Server {
	public static void main(String[] args) {
		try {
			new NettyServerBootstrap(9999);
		} catch (Exception e) {
			
			e.printStackTrace();
		}
}


}

客戶端:

public class Client {
	
	public static void main(String[] args) throws Exception {
		NettyClientBootstrap bootstrap = new NettyClientBootstrap(9999, "127.0.0.1");
		int i = 1;
 
		while (true) {
			TimeUnit.SECONDS.sleep(2);
			System.out.println("send heartbeat!");
			
			RequestInfoVO req = new RequestInfoVO();
			req.setSequence(123456);
			req.setType((byte) 1);
			req.setSequence(0);
			req.setBody(String.valueOf((new Date()).getTime()));
			
			TestVO vo = new TestVO();
			vo.setReq(1);
			vo.setName("582552252525");
			
//			bootstrap.getSocketChannel().write(req);
			bootstrap.getSocketChannel().writeAndFlush(vo);
			i++;
		}
	}


}

上面程式碼的主要功能為一個實現長連線,同時客戶端定時會向服務端傳送心跳包類檢測連線是否正常。
有幾個需要注意的地方:

1.伺服器的連線類中有一行關鍵的程式碼:

p.addLast(new MessageDecoder(), new MessageEncoder(), new NettyServerHandler());

這一行程式碼是一行關鍵的程式碼,理解了這一行程式碼,對netty的整個工作流程的理解非常有幫助:

解碼器MessageDecoder,編碼器MessageEncoder,業務處理NettyServerHandler三個都是ChannelHandler,那麼一條訊息傳送到服務端以後是如何處理的一個流程呢,先要了解一下netty的訊息處理機制

對於一條訊息,netty是通過pipeline事件傳遞的方式處理。

 

netty是採用雙向連結串列的方式來處理訊息事件,用到了責任鏈模式。還有一個概念,就是訊息也有分為:入境(inbound)訊息和出境(outbound)訊息,而handler則也分為入境(inbound)處理handler和出境(outbound)處理handler。

處理過程為:

一個入境訊息是從左到右(header開始向後遍歷),依次被每個為入境型別handler處理(處理過程中的handler也可以選擇終止訊息傳播,直接返回)。最後一個handler處理後返回。即一條入境訊息沿著handler鏈條從左到右處理。

一個出境訊息是從右到左(tail開始向前遍歷),依次被每個為出境型別handler處理(處理過程中的handler也可以選擇終止訊息傳播,直接返回)。最後一個handler處理後返回。一條出境訊息沿著handler鏈條從右到左處理。

再來看上面的情況:

上面的三個handler組成一條鏈條:MessageDecoder->MessageEncoder->NettyServerHandler

其中兩個入境handler:MessageDecoder和NettyServerHandler,一個出境handler:MessageEncoder

一個客戶端傳送一個訊息過來,對於服務端來說這是一個入境事件,所以先後會經過MessageDecoder和NettyServerHandler處理,MessageDecoder先將網路傳送過來的位元組陣列進行解碼(反序列化)成java物件。然後交給業務處理類NettyServerHandler處理

現在如果假設把順序替換一下。成下面這樣

p.addLast(new NettyServerHandler(),new MessageDecoder(), new MessageEncoder());

然業務處理類NettyServerHandler在解碼類MessageDecoder前面,會發現什麼狀況?

這是訊息首先經過NettyServerHandler,但是這個時候訊息並不是物件形式的,而是位元組陣列形式的,所以NettyServerHandler並不處理訊息,而是將其傳播到解碼器MessageDecoder。這時候入境訊息就沒有被處理。

可見在載入handler鏈條的時候,要確保解碼器類在入境業務處理類的前面(入境訊息從左到右處理);同理編碼器要放在出境業務處理類的前面(出境訊息從右到左處理)。

還有一點,業務處理類NettyServerHandler中有RequestInfoVO的泛型,這個表示該業務處理類只會攔截解碼後為RequestInfoVO型別的物件進行處理,如果解碼後為其他型別的物件,則不處理,向下一個入境handler傳播訊息。

 

多個解碼器的情況

上面是一個RequestInfoVO訊息的情況,其可以處理查詢使用者資訊請求,假設使用者現在需要退出登入,有兩種方式:

1.將RequestInfoVO擴充套件,讓其也可以表示退出登陸訊息

這個時候只需要對解碼器進行相應的修改,讓其處理擴充套件後的物件

2.新建一個訊息類RequetLogoutVO類,然其表示退出登陸訊息

這種情況下,需要新增一個解碼器MessageDecoder2,然解碼器來處理RequetLogoutVO型別的訊息

這就引申出了一個問題,當存在多個解碼器的時候流程如何處理?

假設現在新增了一個解碼器,變為:

p.addLast(new MessageDecoder(),new MessageDecoder2(),new MessageEncoder(),new NettyServerHandler());

現在RequetLogoutVO類位元組陣列訊息傳送到服務端後:

先經過MessageDecoder解碼,這個時候訊息變為RequetLogoutVO類JAVA物件,其再經過解碼器MessageDecoder2,這個時候解碼器會對JAVA物件RequetLogoutVO進行再一次的解碼,變為一個為止的訊息,這個訊息到達處理類NettyServerHandler已經變為亂碼,無法正常處理。

所以在一個系統中,只定義一個解碼器和一個編碼器,要處理多種不同型別的訊息是通過擴充套件訊息類來實現,即一個通用的訊息類可以表示各種型別的業務處理訊息。

綜合上面的分析,總結一下:

1.pipeline載入handler鏈的順序有要求,解碼器要在入境業務處理類前面;編碼器要在出境業務處理類前面;

2.業務處理類攔截的是解碼後為其類泛型中定義的java物件,解碼不為其泛型定義的java物件的,其不處理,向後傳播訊息;

3.一個系統中只定義一個解碼器和一個編碼器,定義多個解碼器和多個編碼器會產生不可預知的錯誤。

 

參考:https://blog.csdn.net/weihao_/article/details/72780444

https://blog.csdn.net/zxhoo/article/details/17264263