SpringBoot使用Netty實現遠端呼叫的示例
前言
眾所周知我們在進行網路連線的時候,建立套接字連線是一個非常消耗效能的事情,特別是在分散式的情況下,用執行緒池去保持多個客戶端連線,是一種非常消耗執行緒的行為。那麼我們該通過什麼技術去解決上述的問題呢,那麼就不得不提一個網路連線的利器——Netty
.
正文 Netty
Netty是一個NIO
客戶端伺服器框架:
- 它可快速輕鬆地開發網路應用程式,例如協議伺服器和客戶端。
- 它極大地簡化和簡化了網路程式設計,例如
TCP
和UDP
套接字伺服器。
NIO
是一種非阻塞IO
,它具有以下的特點
- 單執行緒可以連線多個客戶端。
- 選擇器可以實現但執行緒管理多個
Channel
,新建的通道都要向選擇器註冊。 - 一個
SelectionKey
鍵表示了一個特定的通道物件和一個特定的選擇器物件之間的註冊關係。 selector
進行select()
操作可能會產生阻塞,但是可以設定阻塞時間,並且可以用wakeup()
喚醒selector
,所以NIO
是非阻塞IO
。
Netty模型selector模式
它相對普通NIO
的在效能上有了提升,採用了:
NIO
採用多執行緒的方式可以同時使用多個selector
- 通過繫結多個埠的方式,使得一個
selector
可以同時註冊多個ServerSocketServer
- 單個執行緒下只能有一個
selector
,用來實現Channel
的匹配及複用
半包問題
TCP/IP
在傳送訊息的時候,可能會拆包,這就導致接收端無法知道什麼時候收到的資料是一個完整的資料。在傳統的BIO
中在讀取不到資料時會發生阻塞,但是NIO
不會。為了解決NIO
的半包問題,Netty
在Selector
模型的基礎上,提出了reactor
模式,從而解決客戶端請求在服務端不完整的問題。
netty模型reactor模式
在selector
的基礎上解決了半包問題。
上圖,簡單地可以描述為"boss接活,讓work幹":manReactor
用來接收請求(會與客戶端進行握手驗證),而subReactor
用來處理請求(不與客戶端直接連線)。
SpringBoot使用Netty實現遠端呼叫
maven依賴
<!--lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.2</version> <optional>true</optional> </dependency> <!--netty--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.17.Final</version> </dependency>
服務端部分
NettyServer.java:服務啟動監聽器
@Slf4j public class NettyServer { public void start() { InetSocketAddress socketAddress = new InetSocketAddress("127.0.0.1",8082); //new 一個主執行緒組 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //new 一個工作執行緒組 EventLoopGroup workGroup = new NioEventLoopGroup(200); ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializer()) .localAddress(socketAddress) //設定佇列大小 .option(ChannelOption.SO_BACKLOG,1024) // 兩小時內沒有資料的通訊時,TCP會自動傳送一個活動探測資料報文 .childOption(ChannelOption.SO_KEEPALIVE,true); //繫結埠,開始接收進來的連線 try { ChannelFuture future = bootstrap.bind(socketAddress).sync(); log.info("伺服器啟動開始監聽埠: {}",socketAddress.getPort()); future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("伺服器開啟失敗",e); } finally { //關閉主執行緒組 bossGroup.shutdownGracefully(); //關閉工作執行緒組 workGroup.shutdownGracefully(); } } }
ServerChannelInitializer.java:netty
服務初始化器
/** * netty服務初始化器 **/ public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //新增編解碼 socketChannel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8)); socketChannel.pipeline().addLast(new NettyServerHandler()); } }
NettyServerHandler.java:netty
服務端處理器
/** * netty服務端處理器 **/ @Slf4j public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 客戶端連線會觸發 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("Channel active......"); } /** * 客戶端發訊息會觸發 */ @Override public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception { log.info("伺服器收到訊息: {}",msg.toString()); ctx.write("你也好哦"); ctx.flush(); } /** * 發生異常觸發 */ @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
RpcServerApp.java:SpringBoot
啟動類
/** * 啟動類 * */ @Slf4j @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}) public class RpcServerApp extends SpringBootServletInitializer { @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(RpcServerApp.class); } /** * 專案的啟動方法 * * @param args */ public static void main(String[] args) { SpringApplication.run(RpcServerApp.class,args); //開啟Netty服務 NettyServer nettyServer =new NettyServer (); nettyServer.start(); log.info("======服務已經啟動========"); } }
客戶端部分
NettyClientUtil.java:NettyClient
工具類
/** * Netty客戶端 **/ @Slf4j public class NettyClientUtil { public static ResponseResult helloNetty(String msg) { NettyClientHandler nettyClientHandler = new NettyClientHandler(); EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap() .group(group) //該引數的作用就是禁止使用Nagle演算法,使用於小資料即時傳輸 .option(ChannelOption.TCP_NODELAY,true) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("decoder",new StringDecoder()); socketChannel.pipeline().addLast("encoder",new StringEncoder()); socketChannel.pipeline().addLast(nettyClientHandler); } }); try { ChannelFuture future = bootstrap.connect("127.0.0.1",8082).sync(); log.info("客戶端傳送成功...."); //傳送訊息 future.channel().writeAndFlush(msg); // 等待連線被關閉 future.channel().closeFuture().sync(); return nettyClientHandler.getResponseResult(); } catch (Exception e) { log.error("客戶端Netty失敗",e); throw new BusinessException(CouponTypeEnum.OPERATE_ERROR); } finally { //以一種優雅的方式進行執行緒退出 group.shutdownGracefully(); } } }
NettyClientHandler.java:客戶端處理器
/** * 客戶端處理器 **/ @Slf4j @Setter @Getter public class NettyClientHandler extends ChannelInboundHandlerAdapter { private ResponseResult responseResult; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客戶端Active ....."); } @Override public void channelRead(ChannelHandlerContext ctx,Object msg) throws Exception { log.info("客戶端收到訊息: {}",msg.toString()); this.responseResult = ResponseResult.success(msg.toString(),CouponTypeEnum.OPERATE_SUCCESS.getCouponTypeDesc()); ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
驗證
測試介面
@RestController @Slf4j public class UserController { @PostMapping("/helloNetty") @MethodLogPrint public ResponseResult helloNetty(@RequestParam String msg) { return NettyClientUtil.helloNetty(msg); } }
訪問測試介面
服務端列印資訊
客戶端列印資訊
原始碼
專案原始碼可從的我的github中獲取:github原始碼地址
到此這篇關於SpringBoot使用Netty實現遠端呼叫的示例的文章就介紹到這了,更多相關SpringBoot Netty遠端呼叫內容請搜尋我們以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援我們!