使用Netty自定義實現Dubbo
阿新 • • 發佈:2021-11-24
使用Netty自定義實現Dubbo
設計目標:
使用 Netty 實現一個簡單的 RPC 框架。
設計需求:
模仿 Dubbo,消費者和提供者共同約定介面和協議,消費者遠端呼叫提供者的服務,提供者返回一個字串,消費者列印提供者返回的資料。
設計說明:
網路通訊使用 Netty 4.1.20。
設計示圖:
程式碼示例:
公共介面 HelloService /** * 客戶端與伺服器端公共介面 * * @author LJT * @date 2021/11/23 13:42 */ public interface HelloService { String sayHello(String mes); }
HelloServiceImpl /** * 伺服器端實現公共介面 * * @author LJT * @date 2021/11/23 13:43 */ public class HelloServiceImpl implements HelloService { private static int count = 0; // 服務端實現公共介面,重寫裡面的方法 // 當收到客戶端的訊息時,返回響應的結果 public String sayHello(String mes) { System.out.println("收到了客戶端的訊息=" + mes);// 根據請求不同的mes,返回不同的響應結果 if (mes != null) { return "你好呀客戶端,我已經收到你的訊息【" + mes + "】第" + (++count) + "次"; } else { return "你好呀客戶端,我已經收到你的訊息了"; } } }
ServerBootstrap /** * ServerBootstrap 會啟動一個服務提供者,即 NettyServer * * @author LJT * @date 2021/11/23 13:51*/ public class ServerBootstrap { public static void main(String[] args) { // 啟動服務,繫結本機的地址與埠 NettyServer.startServer("127.0.0.1", 7000); } }
NettyServer import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * 完成NettyServer的初始化和啟動 * * @author LJT * @date 2021/11/23 13:54 */ public class NettyServer { // 接收外來的呼叫傳參 public static void startServer(String hostName, int port) { startServer0(hostName, port); } // 初始化NettyServer並呼叫啟動服務 private static void startServer0(String hostName, int port) { // 服務端的兩個group,一個用來接收請求,一個用來處理邏輯 // 不寫引數,預設是 cpu核數 * 2 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); // 配置啟動引數 serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); // 自定義業務處理器 } }); // 回撥處理 ChannelFuture channelFuture = serverBootstrap.bind(hostName, port).sync(); System.out.println("伺服器啟動成功,開始提供服務~~~"); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 最後關閉,防止浪費資源,要養成習慣 bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
NettyServerHandler import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * 自定義伺服器處理器 * * @author LJT * @date 2021/11/23 14:14 */ // 這裡繼承 ChannelInboundHandlerAdapter 介面卡 // 不需要再去管其它事情,只需要把精力集中重寫需要的方法,處理業務邏輯 public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 獲取客戶端傳送的訊息,列印訊息 System.out.println("msg=" + msg); // 客戶端在呼叫伺服器的 api 時,我們需要定義一個協議 // 比如,每次發訊息都必須是以某個字串開頭 “HelloService#hello#你好” // 在實際的開發中,類似二次解碼,提取出有用的資訊 if (msg.toString().startsWith(ClientBootstrap.providerName)) { String result = new HelloServiceImpl().sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
ClientBootstrap /** * 客戶端啟動引導 * * @author LJT * @date 2021/11/23 15:16 */ public class ClientBootstrap { // 這裡定義協議頭 public static final String providerName = "HelloService#hello#"; public static void main(String[] args) throws Exception { // 建立一個消費者 NettyClient customer = new NettyClient(); // 建立代理物件,在實際的開發中,使用 spring 進行管理 HelloService service = (HelloService) customer.getBean(HelloService.class, providerName); for (; ; ) { Thread.sleep(2 * 1000); // 通過代理物件呼叫服務提供者的方法(服務) String res = service.sayHello("你好 dubbo~"); System.out.println("呼叫的結果 res= " + res); } } }
NettyClient import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.lang.reflect.Proxy; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * NettyClient的建立與初始化 * * @author LJT * @date 2021/11/23 14:40 */ public class NettyClient { // 建立執行緒池 private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static NettyClientHandler client; private int count = 0; // 編寫方法使用代理模式,獲取一個代理物件 // 在開發中,使用 spring 獲取,不需要太關注於底層細節 public Object getBean(final Class<?> serivceClass, final String providerName) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serivceClass}, (proxy, method, args) -> { System.out.println("(proxy, method, args) 進入...." + (++count) + " 次"); //{} 部分的程式碼,客戶端每呼叫一次 hello, 就會進入到該程式碼 if (client == null) { initClient(); } //設定要發給伺服器端的資訊 //providerName 協議頭 args[0] 就是客戶端呼叫api hello(???), 引數 client.setPara(providerName + args[0]); return executor.submit(client).get(); }); } //初始化客戶端 private static void initClient() { client = new NettyClientHandler(); // 建立EventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); // 配置初始化啟動引數 bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(client); } } ); try { // 連線請求的伺服器地址與埠 bootstrap.connect("127.0.0.1", 7000).sync(); } catch (Exception e) { e.printStackTrace(); }
// 注意這裡不需要關閉 group } }
NettyClientHandler import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.Callable; /** * 自定義客戶端處理器 * * @author LJT * @date 2021/11/23 14:24 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context;//上下文 private String result; //返回的結果 private String para; //客戶端呼叫方法時,傳入的引數 // 與伺服器的連線建立後,就會被呼叫, 這個方法是第一個被呼叫 // 確保它是活的 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" channelActive 被呼叫 "); context = ctx; //因為我們在其它方法會使用到 ctx } //收到伺服器的資料後,邏輯處理 @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(" channelRead 被呼叫 "); result = msg.toString(); notify(); //喚醒等待的執行緒 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } // 被代理物件呼叫, 傳送資料給伺服器,-> wait -> 等待被喚醒(channelRead) // 注意這裡需要加上 synchronized 不然會報異常 @Override public synchronized Object call() throws Exception { System.out.println(" call1 被呼叫 "); context.writeAndFlush(para); //進行wait wait(); //等待channelRead 方法獲取到伺服器的結果後,喚醒 System.out.println(" call2 被呼叫 "); return result; //服務方返回的結果 } void setPara(String para) { System.out.println(" setPara "); this.para = para; } }