Netty學習之Demo搭建
阿新 • • 發佈:2021-07-31
如下所示,我們寫一個簡單的Netty Demo,實現客戶端與服務端進行通訊。
1、Netty 服務端啟動類
/** * (1)、 初始化用於Acceptor的主"執行緒池"以及用於I/O工作的從"執行緒池"; * (2)、 初始化ServerBootstrap例項, 此例項是netty服務端應用開發的入口; * (3)、 通過ServerBootstrap的group方法,設定(1)中初始化的主從"執行緒池"; * (4)、 指定通道channel的型別,由於是服務端,故而是NioServerSocketChannel; * (5)、 設定ServerSocketChannel的處理器 * (6)、 設定子通道也就是SocketChannel的處理器, 其內部是實際業務開發的"主戰場" * (8)、 配置子通道也就是SocketChannel的選項 * (9)、 繫結並偵聽某個埠 */ public class SimpleNettyServer { public void bind(int port) throws Exception { // 伺服器端應用程式使用兩個NioEventLoopGroup建立兩個EventLoop的組,EventLoop這個相當於一個處理執行緒,是Netty接收請求和處理IO請求的執行緒。 // 主執行緒組, 用於接受客戶端的連線,但是不做任何處理,跟老闆一樣,不做事 EventLoopGroup bossGroup = new NioEventLoopGroup(); // 從執行緒組, 當boss接受連線並註冊被接受的連線到worker時,處理被接受連線的流量。 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // netty伺服器啟動類的建立, 輔助工具類,用於伺服器通道的一系列配置 ServerBootstrap serverBootstrap = new ServerBootstrap(); /** * 使用了多少執行緒以及如何將它們對映到建立的通道取決於EventLoopGroup實現,甚至可以通過建構函式進行配置。 * 設定迴圈執行緒組,前者用於處理客戶端連線事件,後者用於處理網路IO(server使用兩個引數這個) * public ServerBootstrap group(EventLoopGroup group) * public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) */ serverBootstrap.group(bossGroup, workerGroup) //繫結兩個執行緒組 // 用於構造socketchannel工廠 .channel(NioServerSocketChannel.class) //指定NIO的模式 /** * @Description: 初始化器,channel註冊後,會執行裡面的相應的初始化方法,傳入自定義客戶端Handle(服務端在這裡操作) * @Override protected void initChannel(SocketChannel channel) throws Exception { // 通過SocketChannel去獲得對應的管道 ChannelPipeline pipeline = channel.pipeline(); // 通過管道,新增handler pipeline.addLast("nettyServerOutBoundHandler", new NettyServerOutBoundHandler()); pipeline.addLast("nettyServerHandler", new NettyServerHandler()); } * 子處理器也可以通過下面的內部方法來實現。 */ .childHandler(new ChannelInitializer<SocketChannel>() { // 子處理器,用於處理workerGroup protected void initChannel(SocketChannel socketChannel) throws Exception { // socketChannel.pipeline().addLast(new NettyServerOutBoundHandler()); socketChannel.pipeline().addLast(new SimpleNettyServerHandler()); } }); // 啟動server,繫結埠,開始接收進來的連線,設定8088為啟動的埠號,同時啟動方式為同步 ChannelFuture channelFuture = serverBootstrap.bind(8088).sync(); System.out.println("server start"); // 監聽關閉的channel,等待伺服器 socket 關閉 。設定位同步方式 channelFuture.channel().closeFuture().sync(); } finally { //退出執行緒組 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; new netty.server.SimpleNettyServer().bind(port); } }
2、Netty 服務端處理類Handler
public class SimpleNettyServerHandler extends ChannelInboundHandlerAdapter { /** * 本方法用於讀取客戶端傳送的資訊 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("SimpleNettyServerHandler.channelRead"); ByteBuf result = (ByteBuf) msg; byte[] bytesMsg = new byte[result.readableBytes()]; // msg中儲存的是ByteBuf型別的資料,把資料讀取到byte[]中 result.readBytes(bytesMsg); String resultStr = new String(bytesMsg); // 接收並列印客戶端的資訊 System.out.println("Client said:" + resultStr); // 釋放資源,這行很關鍵 result.release(); // 向客戶端傳送訊息 String response = "hello client!"; // 在當前場景下,傳送的資料必須轉換成ByteBuf陣列 ByteBuf encoded = ctx.alloc().buffer(4 * response.length()); encoded.writeBytes(response.getBytes()); ctx.write(encoded); ctx.flush(); } /** * 本方法用作處理異常 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 當出現異常就關閉連線 cause.printStackTrace(); ctx.close(); } /** * 資訊獲取完畢後操作 * * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
3、Netty 客戶端啟動類
public class SimpleNettyClient { public void connect(String host, int port) throws Exception { EventLoopGroup worker = new NioEventLoopGroup(); try { // 客戶端啟動類程式 Bootstrap bootstrap = new Bootstrap(); /** *EventLoop的組 */ bootstrap.group(worker); /** * 用於構造socketchannel工廠 */ bootstrap.channel(NioSocketChannel.class); /**設定選項 * 引數:Socket的標準引數(key,value),可自行百度 保持呼吸,不要斷氣! * */ bootstrap.option(ChannelOption.SO_KEEPALIVE, true); /** * 自定義客戶端Handle(客戶端在這裡搞事情) */ bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new SimpleNettyClientHandler()); } }); /** 開啟客戶端監聽,連線到遠端節點,阻塞等待直到連線完成*/ ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); /**阻塞等待資料,直到channel關閉(客戶端關閉)*/ channelFuture.channel().closeFuture().sync(); } finally { worker.shutdownGracefully(); } } public static void main(String[] args) throws Exception { SimpleNettyClient client = new SimpleNettyClient(); client.connect("127.0.0.1", 8088); } }
4、客戶端處理類Handler
public class SimpleNettyClientHandler extends ChannelInboundHandlerAdapter { /** * 本方法用於接收服務端傳送過來的訊息 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("SimpleClientHandler.channelRead"); ByteBuf result = (ByteBuf) msg; byte[] result1 = new byte[result.readableBytes()]; result.readBytes(result1); System.out.println("Server said:" + new String(result1)); result.release(); } /** * 本方法用於處理異常 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 當出現異常就關閉連線 cause.printStackTrace(); ctx.close(); } /** * 本方法用於向服務端傳送資訊 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { String msg = "hello Server!"; ByteBuf encoded = ctx.alloc().buffer(4 * msg.length()); encoded.writeBytes(msg.getBytes()); ctx.write(encoded); ctx.flush(); } }
先啟動服務端,然後啟動客戶端,可分別在Console得到以下輸出:
服務端:
server start SimpleNettyServerHandler.channelRead Client said:hello Server!
客戶端:
SimpleClientHandler.channelRead Server said:hello client!
由此,一個Netty的簡單Demo即搭建完成。