1. 程式人生 > >netty長連線實戰

netty長連線實戰

1,準備好netty必須的jar

<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.0.23.Final</version>
</dependency>
<dependency>
   <groupId>org.msgpack</groupId>
   <artifactId>msgpack</artifactId>
   <version>0.6.12</version>
</dependency>

2,服務端-Server

啟動服務方法:



public void satrtServer(int port) throws Exception {
    //處理和客戶端連線的執行緒組(構造器引數預設是空,執行緒數就是當前機器CPU的核心個數)
    EventLoopGroup acceptGroup = new NioEventLoopGroup(1);
    //處理客戶端與服務端寫入讀取的IO等業務的執行緒組
    EventLoopGroup handlerIoGroup = new NioEventLoopGroup(1);
    try {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(acceptGroup, handlerIoGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 1024) //http緩衝區
                 .option(ChannelOption.SO_SNDBUF, 3 * 1024) //傳送到客戶端的最大長度
                 .option(ChannelOption.SO_RCVBUF, 3 * 1024) //接收客戶端資訊的最大長度
                 .option(ChannelOption.TCP_NODELAY, true)   //無延遲推送
                 .childOption(ChannelOption.SO_KEEPALIVE, true)
                 //服務端是childHandler
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ByteBuf byteBuf = Unpooled.copiedBUffer("
[email protected]
$".getBytes("UTF-8")); //處理客戶端連續傳送流導致粘包問題,客戶端傳送的資訊需要[email protected]$代表發生結束 ch.pipeline().addLast(new DelimiterBasedFrameDecoder( 3 * 1024,byteBuf)); ch.pipeline().addLast(new ReadTimeoutHandler(10)); //超過10秒沒有讀取到客戶端傳送的資訊就關閉這個通道 ch.pipeline().addLast(new ServerHandler()); } }); //非同步阻塞 ChannelFuture f = serverBootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } finally { acceptGroup.shutdownGracefully(); handlerIoGroup .shutdownGracefully(); } }

3,實現服務端處理業務ServerHandler:

public class ServerHandler extends ChannelInboundHandlerAdapter {
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		ByteBuf msgBuf = (ByteBuf) msg;

        String gMsg = msgBuf.toString(CharsetUtil.UTF_8);
		ctx.channel().writeAndFlush(Unpooled.copiedBuffer("收到資訊了".getBytes()));
	}
	@Override
	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
		ctx.flush();
	}
	/**
	 * 異常處理
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
		cause.printStackTrace();
		ctx.close();
	}
}