Netty(1):第一個netty程式
為什麼選擇Netty
netty是業界最流行的NIO框架之一,它的健壯型,功能,效能,可定製性和可擴充套件性都是首屈一指的,Hadoop的RPC框架Avro就使用了netty作為底層的通訊框架,此外netty在網際網路,大資料,網路遊戲,企業應用,電信軟體等眾多行業都得到了成功的商業應用。正因為以上的一些特性,使得netty已經成為java NIO程式設計的首選框架。
構建netty開發環境
其實使用netty很簡單,直接將其jar包引入到工程中即可使用。 去 http://netty.io/網站上下載最新版本的jar包(由於官網上netty5已經被廢棄,但是這裡仍然使用netty5進行開發, 可以考慮從csnd下載),我這裡下載的為:netty-5.0.0.Alpha1.tar.bz2。這其實是一個壓縮檔案,解壓這個檔案,取裡面的所有類集合到一起的那個jar包netty-all-5.0.0.Alpha1.jar即可。另外還需要注意的是,我這裡使用的jdk版本是1.8。
第一個netty程式 這裡利用netty來實現一個時鐘的小程式,伺服器端接收特定的指令然後將伺服器時間返回給客戶端,客戶端按照一定的時間間隔往伺服器端傳送命令。
1 package com.rampage.netty.time; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelOption; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 12 /** 13 * 時鐘程式的伺服器端 14 * @author zyq 15 * 16 */ 17 public class TimeServer { 18 19 public static void main(String[] args) throws Exception { 20 new TimeServer().bind(8080); 21 } 22 23 public void bind(int port) throws Exception { 24 // 配置伺服器的NIO執行緒組 25 EventLoopGroup bossGroup = new NioEventLoopGroup(); 26 EventLoopGroup workerGroup = new NioEventLoopGroup(); 27 28 try { 29 ServerBootstrap bootStrap = new ServerBootstrap(); 30 31 // 進行鏈式呼叫(每一次呼叫的返回結果都是ServerBootstrap) 32 // group帶兩個引數第一個表示給父(acceptor)用的EventExecutorGroup(其實就是執行緒池) 33 // 第二個引數表示子(client)執行緒池 34 // channel方法可以帶一個ServerChannel類來建立進行IO操作的通道。 35 // option方法給Channel定製對應的選項 36 // childHandler方法用來處理Channel中的請求 37 bootStrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) 38 .option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChildChannelHandler()); 39 40 // 繫結埠,等待同步成功 41 // bind方法返回一個ChannelFuture類,就是相當於繫結埠並且建立一個新的channel 42 // sync方法會等待ChannelFuture的處理結束 43 ChannelFuture future = bootStrap.bind(port).sync(); 44 45 // 等待伺服器監聽埠關閉 46 future.channel().closeFuture().sync(); 47 } finally { 48 // 優雅地退出,釋放執行緒資源 49 bossGroup.shutdownGracefully(); 50 workerGroup.shutdownGracefully(); 51 } 52 } 53 54 private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { 55 56 @Override 57 protected void initChannel(SocketChannel arg0) throws Exception { 58 arg0.pipeline().addLast(new TimeServerHandler()); 59 } 60 61 } 62 }
1 package com.rampage.netty.time; 2 3 import java.util.Date; 4 5 import io.netty.buffer.ByteBuf; 6 import io.netty.buffer.Unpooled; 7 import io.netty.channel.ChannelHandlerAdapter; 8 import io.netty.channel.ChannelHandlerContext; 9 10 /** 11 * 時間伺服器的處理類,只有netty5中的ChannelHandlerAdapter中才有ChannelRead和ChannelReadComplete方法。 12 * @author zyq 13 * 14 */ 15 public class TimeServerHandler extends ChannelHandlerAdapter { 16 @Override 17 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 18 // netty中的ByteBuf類相當於jdk中的ByteBuffer類,但是功能更加強大 19 ByteBuf buf = (ByteBuf) msg; 20 21 // readableBytes返回緩衝區可讀的位元組數 22 byte[] req = new byte[buf.readableBytes()]; 23 24 // 將緩衝區的位元組數複製到新的位元組陣列中去 25 buf.readBytes(req); 26 27 // 根據客戶端傳來的資訊得到應答資訊 28 String body = new String(req, "UTF-8"); 29 System.out.println("The time server receive order:" + body); 30 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; 31 32 // 給客戶端的迴應 33 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); 34 35 // 為了防止頻繁喚醒Selector進行訊息傳送,Netty的write方法並不直接將訊息寫入到SocketChannel中,而是把訊息放入到緩衝陣列 36 ctx.write(resp); 37 } 38 39 @Override 40 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 41 // 將放到緩衝陣列中的訊息寫入到SocketChannel中去 42 ctx.flush(); 43 } 44 45 @Override 46 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 47 ctx.close(); 48 } 49 50 51 }
1 package com.rampage.netty.time;
2
3 import io.netty.bootstrap.Bootstrap;
4 import io.netty.channel.ChannelFuture;
5 import io.netty.channel.ChannelInitializer;
6 import io.netty.channel.ChannelOption;
7 import io.netty.channel.EventLoopGroup;
8 import io.netty.channel.nio.NioEventLoopGroup;
9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioSocketChannel;
11
12 /**
13 * 時鐘程式的客戶端
14 * @author zyq
15 *
16 */
17 public class TimeClient {
18
19 public static void main(String[] args) throws Exception {
20 new TimeClient().connect("127.0.0.1", 8080);
21 }
22
23 public void connect(String host, int port) throws Exception {
24 // 配置客戶端NIO執行緒池
25 EventLoopGroup group = new NioEventLoopGroup();
26
27 Bootstrap strap = new Bootstrap();
28 try {
29 // 這裡用了匿名內部類,各個函式的含義同Server端
30 strap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
31 .handler(new ChannelInitializer<SocketChannel>() {
32
33 @Override
34 protected void initChannel(SocketChannel arg0) throws Exception {
35 arg0.pipeline().addLast(new TimeClientHandler());
36 }
37
38 });
39
40 // 發起非同步連線操作
41 ChannelFuture future = strap.connect(host, port).sync();
42
43 // 等待客戶端關閉(注意呼叫的是closeFuture如果直接呼叫close會立馬關閉)
44 future.channel().closeFuture().sync();
45 } finally {
46 // 優雅的關閉
47 group.shutdownGracefully();
48 }
49 }
50 }
1 package com.rampage.netty.time;
2
3 import java.util.logging.Logger;
4
5 import io.netty.buffer.ByteBuf;
6 import io.netty.buffer.Unpooled;
7 import io.netty.channel.ChannelHandlerAdapter;
8 import io.netty.channel.ChannelHandlerContext;
9
10 public class TimeClientHandler extends ChannelHandlerAdapter {
11
12 private static final Logger LOGGER = Logger.getLogger(TimeClientHandler.class.getName());
13
14 private final ByteBuf firstMsg;
15
16 public TimeClientHandler() {
17 byte[] req = "QUERY TIME ORDER".getBytes();
18 firstMsg = Unpooled.buffer(req.length);
19 firstMsg.writeBytes(req);
20 }
21
22 /**
23 * channel連通之後的處理
24 */
25 @Override
26 public void channelActive(ChannelHandlerContext ctx) throws Exception {
27 ctx.writeAndFlush(firstMsg);
28 }
29
30 @Override
31 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
32 ByteBuf buf = (ByteBuf) msg;
33 byte[] resp = new byte[buf.readableBytes()];
34 buf.readBytes(resp);
35
36 String body = new String(resp, "UTF-8");
37 System.out.println("Now is:" + body);
38
39 // 兩秒鐘後繼續向伺服器端傳送訊息
40 Thread.sleep(2000);
41 byte[] req = "QUERY TIME ORDER".getBytes();
42 ByteBuf sendMsg = Unpooled.buffer(req.length);
43 sendMsg.writeBytes(req);
44 ctx.writeAndFlush(sendMsg);
45 }
46
47 @Override
48 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
49 LOGGER.warning("Unexpected exception from downstream:" + cause.getMessage());
50 ctx.close();
51 }
52
53
54 }
伺服器端的執行結果如下:
The time server receive order:QUERY TIME ORDER The time server receive order:QUERY TIME ORDER The time server receive order:QUERY TIME ORDER The time server receive order:QUERY TIME ORDER
...
客戶端的執行結果如下:
Now is:Wed Aug 03 05:55:30 PDT 2016 Now is:Wed Aug 03 05:55:33 PDT 2016 Now is:Wed Aug 03 05:55:35 PDT 2016 Now is:Wed Aug 03 05:55:37 PDT 2016
...
可以發現通過Netty框架來實現NIO極大的簡化了編碼和維護難度。程式碼調理更加清晰。
TimeClientHandler