netty實現TCP長連線
阿新 • • 發佈:2018-12-14
所用jar包
netty-all-4.1.30.Final.jar 密碼:rzwe
NettyConfig.java,存放連線的客戶端
1 import io.netty.channel.group.ChannelGroup; 2 import io.netty.channel.group.DefaultChannelGroup; 3 import io.netty.util.concurrent.GlobalEventExecutor; 4 5 public class NettyConfig { 6 7 /** 8 * 儲存每一個客戶端接入進來時的channel物件9 */ 10 public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 11 12 }
Server.java,netty配置資訊
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 importio.netty.channel.ChannelOption; 5 import io.netty.channel.ChannelPipeline; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.ServerSocketChannel; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel;11 12 public class Server { 13 private int port; 14 private ServerSocketChannel serverSocketChannel; 15 16 public Server(int port){ 17 this.port = port; 18 bind(); 19 } 20 21 private void bind() { 22 Thread thread = new Thread(new Runnable() { 23 @Override 24 public void run() { 25 //服務端要建立兩個group,一個負責接收客戶端的連線,一個負責處理資料傳輸 26 //連線處理group 27 EventLoopGroup boss = new NioEventLoopGroup(); 28 //事件處理group 29 EventLoopGroup worker = new NioEventLoopGroup(); 30 ServerBootstrap bootstrap = new ServerBootstrap(); 31 // 繫結處理group 32 bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) 33 //保持連線數 34 .option(ChannelOption.SO_BACKLOG, 300) 35 //有資料立即傳送 36 .option(ChannelOption.TCP_NODELAY, true) 37 //保持連線 38 .childOption(ChannelOption.SO_KEEPALIVE, true) 39 //處理新連線 40 .childHandler(new ChannelInitializer<SocketChannel>() { 41 @Override 42 protected void initChannel(SocketChannel sc) throws Exception { 43 // 增加任務處理 44 ChannelPipeline p = sc.pipeline(); 45 p.addLast( 46 // //使用了netty自帶的編碼器和解碼器 47 // new StringDecoder(), 48 // new StringEncoder(), 49 //心跳檢測,讀超時,寫超時,讀寫超時 50 //new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS), 51 //自定義的處理器 52 new ServerHandler()); 53 } 54 }); 55 56 //繫結埠,同步等待成功 57 ChannelFuture future; 58 try { 59 future = bootstrap.bind(port).sync(); 60 if (future.isSuccess()) { 61 serverSocketChannel = (ServerSocketChannel) future.channel(); 62 System.out.println("服務端啟動成功,埠:"+port); 63 } else { 64 System.out.println("服務端啟動失敗!"); 65 } 66 67 //等待服務監聽埠關閉,就是由於這裡會將執行緒阻塞,導致無法傳送資訊,所以我這裡開了執行緒 68 future.channel().closeFuture().sync(); 69 } catch (Exception e) { 70 e.printStackTrace(); 71 } 72 finally { 73 //優雅地退出,釋放執行緒池資源 74 boss.shutdownGracefully(); 75 worker.shutdownGracefully(); 76 } 77 } 78 }); 79 thread.start(); 80 } 81 82 public void sendMessage(Object msg){ 83 if(serverSocketChannel != null){ 84 serverSocketChannel.writeAndFlush(msg); 85 } 86 } 87 88 public static void main(String[] args) { 89 Server server = new Server(8088); 90 } 91 }
ServerHandler.java,業務處理
1 import io.netty.channel.ChannelHandlerContext; 2 import io.netty.channel.ChannelInboundHandlerAdapter; 3 4 public class ServerHandler extends ChannelInboundHandlerAdapter { 5 6 /** 7 * 客戶端與服務端建立連線的時候呼叫 8 */ 9 @Override 10 public void channelActive(ChannelHandlerContext ctx) throws Exception { 11 System.out.println("客戶端與服務端連線開始..."); 12 NettyConfig.group.add(ctx.channel()); 13 } 14 15 /** 16 * 客戶端與服務端斷開連線時呼叫 17 */ 18 @Override 19 public void channelInactive(ChannelHandlerContext ctx) throws Exception { 20 System.out.println("客戶端與服務端連線關閉..."); 21 NettyConfig.group.remove(ctx.channel()); 22 } 23 24 /** 25 * 服務端接收客戶端傳送過來的資料結束之後呼叫 26 */ 27 @Override 28 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 29 ctx.flush(); 30 System.out.println("資訊接收完畢..."); 31 } 32 33 /** 34 * 工程出現異常的時候呼叫 35 */ 36 @Override 37 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 38 cause.printStackTrace(); 39 ctx.close(); 40 } 41 42 /** 43 * 服務端處理客戶端websocket請求的核心方法,這裡接收了客戶端發來的資訊 44 */ 45 @Override 46 public void channelRead(ChannelHandlerContext channelHandlerContext, Object info) throws Exception { 47 System.out.println("接收到了:"+info); 48 ByteBuf buf = (ByteBuf) info; 49 byte[] req = new byte[buf.readableBytes()]; 50 buf.readBytes(req); 51 String body = new String(req, "UTF-8"); 52 System.out.println("接收客戶端資料:" + body); 53 ByteBuf pingMessage = Unpooled.buffer(); 54 pingMessage.writeBytes(req); 55 channelHandlerContext.writeAndFlush(pingMessage); 56 57 58 //服務端使用這個就能向 每個連線上來的客戶端群發訊息 59 //NettyConfig.group.writeAndFlush(info); 60 // Iterator<Channel> iterator = NettyConfig.group.iterator(); 61 // while(iterator.hasNext()){ 62 // //打印出所有客戶端的遠端地址 63 // System.out.println((iterator.next()).remoteAddress()); 64 // } 65 } 66 67 68 }
使用網路除錯助手進行連線測試 下載地址
https://www.wanpishe.top/detail?blogId=fc62fce2-020a-4815-8388-0903e4a54e1f