1. 程式人生 > 其它 >Netty:簡單使用

Netty:簡單使用

Netty是什麼東西

Netty是一個封裝很好的非同步事件驅動框架,讓我們快速的部署服務端和客戶端的網路應用,進行非同步IO通訊

1、什麼是IO通訊
IO就是input 和 output,是一種在兩臺主機、兩個程序或者兩個執行緒之間傳輸資料的方法
2、什麼是非同步
非同步和同步相對應,同步 舉個例子就是:a執行緒和b執行緒通訊時,a每次準備要讀取b的資料時,得在原地等b將資料傳過來,不能進行其他操作(b等a也一樣)
非同步 就是:執行緒a每次只需要看看執行緒b有沒有傳資料過來,有就讀取,沒有就執行其他操作。
  感覺就類似於:a和b兩個人要約會,同步情況下,先到約會地點的人得一直在原地等待還沒到的人
     非同步情況下,先到約會地點的人可以一邊跟路邊的美女(帥哥)搭訕,一邊看看對方到了沒
所以在同步情況下,服務端的一個執行緒只能管理一個連線,
  在非同步情況下,服務端的一個執行緒可以管理多個連線

Netty怎麼使用

Netty怎麼用取決於我們要用它做什麼

資料傳輸

資料傳輸我們要進行IO通訊是為了要傳輸資料
Netty將資料傳輸的方法封裝到了ChannelInboundHandlerAdapter中,繼承這個類然後實現裡面的方法就可以進行資料傳輸(應該有點介面卡模式)
下面程式碼中的兩個類就是進行資料傳輸的Handler,其中服務端在連線建立時傳送時間,客戶端收到資料後列印(來自官方文件)

承載我們要傳輸的資訊、進行傳輸的資料型別是ByteBuf型別,ByteBuf是一種引用計數的物件,所以我們使用完後,需要注意釋放物件。

 1 import io.netty.buffer.ByteBuf;
2 import io.netty.channel.ChannelFuture; 3 import io.netty.channel.ChannelFutureListener; 4 import io.netty.channel.ChannelHandlerContext; 5 import io.netty.channel.ChannelInboundHandlerAdapter; 6 7 public class TimeServerHandler extends ChannelInboundHandlerAdapter { 8 9 //在連線建立之後呼叫該方法
10 @Override 11 public void channelActive(ChannelHandlerContext ctx) throws Exception { 12 final ByteBuf time = ctx.alloc().buffer(4); // (2) 13 time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); 14 15 final ChannelFuture f = ctx.writeAndFlush(time); // (3) 16 f.addListener(new ChannelFutureListener() { 17 @Override 18 public void operationComplete(ChannelFuture future) { 19 if (f == future) { 20 ctx.close(); 21 } 22 } 23 }); 24 } 25 //捕捉資料傳輸過程中的異常 26 @Override 27 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 28 cause.printStackTrace(); 29 ctx.close(); 30 } 31 }
 1 import io.netty.buffer.ByteBuf;
 2 import io.netty.channel.ChannelHandlerContext;
 3 import io.netty.channel.ChannelInboundHandlerAdapter;
 4 
 5 import java.util.Date;
 6 
 7 public class TimeClientHandler extends ChannelInboundHandlerAdapter {
 8 
 9     //收到傳送過來的資料之後呼叫該方法
10     @Override
11     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
12 
13         ByteBuf m = (ByteBuf) msg; // (1)
14         try {
15             long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
16             System.out.println(new Date(currentTimeMillis));
17             ctx.close();
18         } finally {
19             m.release();
20         }
21     }
22 
23     @Override
24     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
25 
26         cause.printStackTrace();
27         ctx.close();
28     }
29 }

我們的目的,資料傳輸的操作已經有了,但現在只是知道了要傳輸什麼,然後還得建立連線來進行傳輸

建立連線

我們需要使用執行緒來建立連線(可能協程也可以)

我們不用自己建立執行緒池,因為Netty提供了執行緒組(EventLoopGroup類)的方式來建立執行緒

服務端一般會使用兩個執行緒組,一個用來接收連線,一個用來傳輸資料;客戶端使用一個執行緒組用來傳輸資料

現在需要有一個東西把執行緒組和傳輸資料的Handler組合起來,為了方便就使用了Bootstrap類,

服務端是ServerBootstrap,客戶端是Bootstrap,因為服務端有兩個執行緒組,所以得進行區分

下面是程式碼中建立連線的類,加上面的Handler就是Netty連線完整的(客戶端連線服務端獲取時間的)demo

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.*;
 3 import io.netty.channel.nio.NioEventLoopGroup;
 4 import io.netty.channel.socket.SocketChannel;
 5 import io.netty.channel.socket.nio.NioServerSocketChannel;
 6 
 7 public class TimeServer {
 8 
 9     private int port;
10 
11     public TimeServer(int port){
12         this.port = port;
13     }
14 
15     public void run(){
16 
17         //稱為父執行緒組
18         EventLoopGroup bossGroup = new NioEventLoopGroup();
19         //子執行緒組
20         EventLoopGroup workerGroup = new NioEventLoopGroup();
21 
22         try{
23             //也可以使用channel直接建立連線
24             ServerBootstrap serverBoostrap = new ServerBootstrap();
25             serverBoostrap.group(bossGroup,workerGroup)
26                     //通道型別有NioServerSocketChannel、OioServerSocketChannel、NioSctpServerChannel(linux平臺)
27                     //設定服務端通道實現型別
28                     .channel(NioServerSocketChannel.class)
29                     .childHandler(new ChannelInitializer<SocketChannel>() {
30                         @Override
31                         protected void initChannel(SocketChannel socketChannel) throws Exception {
32 
33 //                            ChannelPipeline用於管理Handler
34                             socketChannel.pipeline().addLast(new TimeServerHandler());
35                         }
36                     })
37                     //設定執行緒佇列的連線個數
38                     .option(ChannelOption.SO_BACKLOG,128)
39                     //對子執行緒組的配置,設定保持活動連線狀態,預設為false,會主動探測空閒連線的有效性
40                     .childOption(ChannelOption.SO_KEEPALIVE,true);
41 
42             //ChannelFuture是指尚未執行的操作,因為netty是非同步操作
43             ChannelFuture channelFuture = serverBoostrap.bind(port).sync();
44             //可以得到channle的各種狀態
45             channelFuture.channel().isOpen();
46             channelFuture.channel().isActive();
47 
48             //直到伺服器socket關閉的時候執行。
49             channelFuture.channel().closeFuture().sync();
50             
51         } catch (InterruptedException e) {
52             e.printStackTrace();
53         } finally {
54             //關閉執行緒組
55             bossGroup.shutdownGracefully();
56             workerGroup.shutdownGracefully();
57         }
58 
59     }
60 
61     public static void main(String[] args) {
62         TimeServer timeServer = new TimeServer(8989);
63         timeServer.run();
64     }
65 
66 
67 }
 1 import io.netty.bootstrap.Bootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import io.netty.channel.ChannelOption;
 5 import io.netty.channel.EventLoopGroup;
 6 import io.netty.channel.nio.NioEventLoopGroup;
 7 import io.netty.channel.socket.SocketChannel;
 8 import io.netty.channel.socket.nio.NioSocketChannel;
 9 
10 public class TimeClient {
11 
12     public static void main(String[] args) {
13         String host = "127.0.0.1";
14         int port = Integer.parseInt("8989");
15         EventLoopGroup workerGroup = new NioEventLoopGroup();
16 
17         try {
18             Bootstrap b = new Bootstrap();
19             b.group(workerGroup);
20             b.channel(NioSocketChannel.class);
21             b.option(ChannelOption.SO_KEEPALIVE, true);
22             b.handler(new ChannelInitializer<SocketChannel>() {
23                 @Override
24                 public void initChannel(SocketChannel ch) throws Exception {
25                     ch.pipeline().addLast(new TimeClientHandler());
26                 }
27             });
28 
29 
30             ChannelFuture f = b.connect(host, port).sync();
31 
32 
33             f.channel().closeFuture().sync();
34         } catch (InterruptedException e) {
35             e.printStackTrace();
36         } finally {
37             workerGroup.shutdownGracefully();
38         }
39     }
40 }