netty長連線實戰
阿新 • • 發佈:2018-12-30
1,準備好netty必須的jar
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.23.Final</version> </dependency> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.12</version> </dependency>
2,服務端-Server
啟動服務方法:
public void satrtServer(int port) throws Exception { //處理和客戶端連線的執行緒組(構造器引數預設是空,執行緒數就是當前機器CPU的核心個數) EventLoopGroup acceptGroup = new NioEventLoopGroup(1); //處理客戶端與服務端寫入讀取的IO等業務的執行緒組 EventLoopGroup handlerIoGroup = new NioEventLoopGroup(1); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(acceptGroup, handlerIoGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //http緩衝區 .option(ChannelOption.SO_SNDBUF, 3 * 1024) //傳送到客戶端的最大長度 .option(ChannelOption.SO_RCVBUF, 3 * 1024) //接收客戶端資訊的最大長度 .option(ChannelOption.TCP_NODELAY, true) //無延遲推送 .childOption(ChannelOption.SO_KEEPALIVE, true) //服務端是childHandler .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ByteBuf byteBuf = Unpooled.copiedBUffer("
[email protected]$".getBytes("UTF-8")); //處理客戶端連續傳送流導致粘包問題,客戶端傳送的資訊需要[email protected]$代表發生結束 ch.pipeline().addLast(new DelimiterBasedFrameDecoder( 3 * 1024,byteBuf)); ch.pipeline().addLast(new ReadTimeoutHandler(10)); //超過10秒沒有讀取到客戶端傳送的資訊就關閉這個通道 ch.pipeline().addLast(new ServerHandler()); } }); //非同步阻塞 ChannelFuture f = serverBootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } finally { acceptGroup.shutdownGracefully(); handlerIoGroup .shutdownGracefully(); } }
3,實現服務端處理業務ServerHandler:
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf msgBuf = (ByteBuf) msg;
String gMsg = msgBuf.toString(CharsetUtil.UTF_8);
ctx.channel().writeAndFlush(Unpooled.copiedBuffer("收到資訊了".getBytes()));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* 異常處理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}