Netty:簡單使用
Netty是什麼東西
Netty是一個封裝很好的非同步事件驅動框架,讓我們快速的部署服務端和客戶端的網路應用,進行非同步IO通訊。
1、什麼是IO通訊
IO就是input 和 output,是一種在兩臺主機、兩個程序或者兩個執行緒之間傳輸資料的方法
2、什麼是非同步
非同步和同步相對應,同步 舉個例子就是:a執行緒和b執行緒通訊時,a每次準備要讀取b的資料時,得在原地等b將資料傳過來,不能進行其他操作(b等a也一樣)
非同步 就是:執行緒a每次只需要看看執行緒b有沒有傳資料過來,有就讀取,沒有就執行其他操作。
感覺就類似於:a和b兩個人要約會,同步情況下,先到約會地點的人得一直在原地等待還沒到的人
非同步情況下,先到約會地點的人可以一邊跟路邊的美女(帥哥)搭訕,一邊看看對方到了沒
所以在同步情況下,服務端的一個執行緒只能管理一個連線,
在非同步情況下,服務端的一個執行緒可以管理多個連線
Netty怎麼使用
Netty怎麼用取決於我們要用它做什麼
資料傳輸
資料傳輸我們要進行IO通訊是為了要傳輸資料
Netty將資料傳輸的方法封裝到了ChannelInboundHandlerAdapter中,繼承這個類然後實現裡面的方法就可以進行資料傳輸(應該有點介面卡模式)
下面程式碼中的兩個類就是進行資料傳輸的Handler,其中服務端在連線建立時傳送時間,客戶端收到資料後列印(來自官方文件)
承載我們要傳輸的資訊、進行傳輸的資料型別是ByteBuf型別,ByteBuf是一種引用計數的物件,所以我們使用完後,需要注意釋放物件。
1 import io.netty.buffer.ByteBuf;2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelFutureListener; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelInboundHandlerAdapter; 6 7 public class TimeServerHandler extends ChannelInboundHandlerAdapter { 8 9 //在連線建立之後呼叫該方法10 @Override 11 public void channelActive(ChannelHandlerContext ctx) throws Exception { 12 final ByteBuf time = ctx.alloc().buffer(4); // (2) 13 time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); 14 15 final ChannelFuture f = ctx.writeAndFlush(time); // (3) 16 f.addListener(new ChannelFutureListener() { 17 @Override 18 public void operationComplete(ChannelFuture future) { 19 if (f == future) { 20 ctx.close(); 21 } 22 } 23 }); 24 } 25 //捕捉資料傳輸過程中的異常 26 @Override 27 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 28 cause.printStackTrace(); 29 ctx.close(); 30 } 31 }
1 import io.netty.buffer.ByteBuf; 2 import io.netty.channel.ChannelHandlerContext; 3 import io.netty.channel.ChannelInboundHandlerAdapter; 4 5 import java.util.Date; 6 7 public class TimeClientHandler extends ChannelInboundHandlerAdapter { 8 9 //收到傳送過來的資料之後呼叫該方法 10 @Override 11 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 12 13 ByteBuf m = (ByteBuf) msg; // (1) 14 try { 15 long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; 16 System.out.println(new Date(currentTimeMillis)); 17 ctx.close(); 18 } finally { 19 m.release(); 20 } 21 } 22 23 @Override 24 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 25 26 cause.printStackTrace(); 27 ctx.close(); 28 } 29 }
我們的目的,資料傳輸的操作已經有了,但現在只是知道了要傳輸什麼,然後還得建立連線來進行傳輸
建立連線
我們需要使用執行緒來建立連線(可能協程也可以)
我們不用自己建立執行緒池,因為Netty提供了執行緒組(EventLoopGroup類)的方式來建立執行緒
服務端一般會使用兩個執行緒組,一個用來接收連線,一個用來傳輸資料;客戶端使用一個執行緒組用來傳輸資料
現在需要有一個東西把執行緒組和傳輸資料的Handler組合起來,為了方便就使用了Bootstrap類,
服務端是ServerBootstrap,客戶端是Bootstrap,因為服務端有兩個執行緒組,所以得進行區分
下面是程式碼中建立連線的類,加上面的Handler就是Netty連線完整的(客戶端連線服務端獲取時間的)demo
1 import io.netty.bootstrap.ServerBootstrap; 2 import io.netty.channel.*; 3 import io.netty.channel.nio.NioEventLoopGroup; 4 import io.netty.channel.socket.SocketChannel; 5 import io.netty.channel.socket.nio.NioServerSocketChannel; 6 7 public class TimeServer { 8 9 private int port; 10 11 public TimeServer(int port){ 12 this.port = port; 13 } 14 15 public void run(){ 16 17 //稱為父執行緒組 18 EventLoopGroup bossGroup = new NioEventLoopGroup(); 19 //子執行緒組 20 EventLoopGroup workerGroup = new NioEventLoopGroup(); 21 22 try{ 23 //也可以使用channel直接建立連線 24 ServerBootstrap serverBoostrap = new ServerBootstrap(); 25 serverBoostrap.group(bossGroup,workerGroup) 26 //通道型別有NioServerSocketChannel、OioServerSocketChannel、NioSctpServerChannel(linux平臺) 27 //設定服務端通道實現型別 28 .channel(NioServerSocketChannel.class) 29 .childHandler(new ChannelInitializer<SocketChannel>() { 30 @Override 31 protected void initChannel(SocketChannel socketChannel) throws Exception { 32 33 // ChannelPipeline用於管理Handler 34 socketChannel.pipeline().addLast(new TimeServerHandler()); 35 } 36 }) 37 //設定執行緒佇列的連線個數 38 .option(ChannelOption.SO_BACKLOG,128) 39 //對子執行緒組的配置,設定保持活動連線狀態,預設為false,會主動探測空閒連線的有效性 40 .childOption(ChannelOption.SO_KEEPALIVE,true); 41 42 //ChannelFuture是指尚未執行的操作,因為netty是非同步操作 43 ChannelFuture channelFuture = serverBoostrap.bind(port).sync(); 44 //可以得到channle的各種狀態 45 channelFuture.channel().isOpen(); 46 channelFuture.channel().isActive(); 47 48 //直到伺服器socket關閉的時候執行。 49 channelFuture.channel().closeFuture().sync(); 50 51 } catch (InterruptedException e) { 52 e.printStackTrace(); 53 } finally { 54 //關閉執行緒組 55 bossGroup.shutdownGracefully(); 56 workerGroup.shutdownGracefully(); 57 } 58 59 } 60 61 public static void main(String[] args) { 62 TimeServer timeServer = new TimeServer(8989); 63 timeServer.run(); 64 } 65 66 67 }
1 import io.netty.bootstrap.Bootstrap; 2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelOption; 5 import io.netty.channel.EventLoopGroup; 6 import io.netty.channel.nio.NioEventLoopGroup; 7 import io.netty.channel.socket.SocketChannel; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 10 public class TimeClient { 11 12 public static void main(String[] args) { 13 String host = "127.0.0.1"; 14 int port = Integer.parseInt("8989"); 15 EventLoopGroup workerGroup = new NioEventLoopGroup(); 16 17 try { 18 Bootstrap b = new Bootstrap(); 19 b.group(workerGroup); 20 b.channel(NioSocketChannel.class); 21 b.option(ChannelOption.SO_KEEPALIVE, true); 22 b.handler(new ChannelInitializer<SocketChannel>() { 23 @Override 24 public void initChannel(SocketChannel ch) throws Exception { 25 ch.pipeline().addLast(new TimeClientHandler()); 26 } 27 }); 28 29 30 ChannelFuture f = b.connect(host, port).sync(); 31 32 33 f.channel().closeFuture().sync(); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } finally { 37 workerGroup.shutdownGracefully(); 38 } 39 } 40 }