1. 程式人生 > 其它 >Netty實現簡單DubboRpc

Netty實現簡單DubboRpc

Netty實現簡單DubboRpc

1. RPC 基本介紹

  1. RPC(Remote Procedure Call)—遠端過程呼叫,是一個計算機通訊協議。該協議允許運行於一臺計算機的程式呼叫另一臺計算機的子程式,而程式設計師無需額外地為這個互動作用程式設計

  2. 兩個或多個應用程式都分佈在不同的伺服器上,它們之間的呼叫都像是本地方法呼叫一樣(如圖)

  3. 常見的 RPC 框架有:比較知名的如阿里的 DubboGooglegRPCGo 語言的 rpcxApachethriftSpring 旗下的 SpringCloud

2. RPC 呼叫流程圖

3. 自己實現dubboRpc

  1. 建立一個介面,定義抽象方法。用於消費者和提供者之間的約定。
  2. 建立一個提供者,該類需要監聽消費者的請求,並按照約定返回資料。
  3. 建立一個消費者,該類需要透明的呼叫自己不存在的方法,內部需要使用 Netty 請求提供者返回資料

3.1 通用介面

public interface HelloService {
    
    String hello(String mes);

    String test(String mes);
}

3.2 伺服器端實現

  1. 介面實現類

    public class HelloServiceImpl implements HelloService {
        private static int count = 0;
    
        //當有消費方呼叫該方法時, 就返回一個結果
        @Override
        public String hello(String mes) {
            System.out.println("收到客戶端訊息=" + mes);
            return "hello";
        }
    
        @Override
        public String test(String mes) {
            return "返回測試訊息";
        }
    }
    
  2. 服務處理器

    public class NettyServerHandler  extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //獲取客戶端傳送的訊息,並呼叫服務
            System.out.println("msg=" + msg);
            //客戶端在呼叫伺服器的api 時,我們需要定義一個協議
            //呼叫的是hello方法,就執行hello方法,返回給客戶端
            if (msg.toString().startsWith(ClientBootstrap.SERVICE_NAME + "#" + ClientBootstrap.METHOD_NAME_HELLO)) {
                String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
                ctx.writeAndFlush(result);
            }
            ////呼叫的是test方法,就執行hello方法,返回給客戶端
            if (msg.toString().startsWith(ClientBootstrap.SERVICE_NAME + "#" + ClientBootstrap.METHOD_NAME_TEST)) {
                String result = new HelloServiceImpl().test(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
                ctx.writeAndFlush(result);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    
  3. 服務類

    public class NettyServer {
        public static void startServer(String hostName, int port) {
            startServer0(hostName, port);
        }
    
        //編寫一個方法,完成對NettyServer的初始化和啟動
    
        private static void startServer0(String hostname, int port) {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
    
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                serverBootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                                          @Override
                                          protected void initChannel(SocketChannel ch) throws Exception {
                                              ChannelPipeline pipeline = ch.pipeline();
                                              pipeline.addLast(new StringDecoder());
                                              pipeline.addLast(new StringEncoder());
                                              pipeline.addLast(new NettyServerHandler()); //業務處理器
    
                                          }
                                      }
    
                        );
    
                ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync();
                System.out.println("服務提供方開始提供服務~~");
                channelFuture.channel().closeFuture().sync();
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
  4. 啟動類

    public class MyServerBootstrap {
        public static void main(String[] args) {
            //程式碼代填..
            NettyServer.startServer("127.0.0.1", 7000);
        }
    }
    

3.3 伺服器端實現

  1. 客戶端處理器

    public class NettyClientHandler  extends ChannelInboundHandlerAdapter implements Callable {
        private ChannelHandlerContext context;//上下文
        private String result; //返回的結果
        private String para; //客戶端呼叫方法時,傳入的引數
    
        //與伺服器的連線建立後,就會被呼叫, 這個方法是第一個被呼叫(1)
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println(" channelActive 被呼叫  ");
            context = ctx; //因為我們在其它方法會使用到 ctx
        }
    
        //收到伺服器的資料後,呼叫方法 (4)
        //
        @Override
        public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println(" channelRead 被呼叫  ");
            result = msg.toString();
            notify(); //喚醒等待的執行緒
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    
        //被代理物件呼叫, 傳送資料給伺服器,-> wait -> 等待被喚醒(channelRead) -> 返回結果 (3)-》5
        @Override
        public synchronized Object call() throws Exception {
            System.out.println(" call1 被呼叫  ");
            context.writeAndFlush(para);
            //進行wait
            wait(); //等待channelRead 方法獲取到伺服器的結果後,喚醒
            System.out.println(" call2 被呼叫  ");
            return result; //服務方返回的結果
    
        }
    
        //(2)
        void setPara(String para) {
            System.out.println(" setPara  ");
            this.para = para;
        }
    }
    
  2. 客戶

    public class NettyClient {
        //建立執行緒池
        private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    
        private static NettyClientHandler client;
        private int count = 0;
    
        //編寫方法使用代理模式,獲取一個代理物件
    
        public Object getBean(final Class<?> serivceClass, final String providerName) {
    
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                    new Class<?>[]{serivceClass}, (proxy, method, args) -> {
    
                        System.out.println("(proxy, method, args) 進入...." + (++count) + " 次");
                        //{}  部分的程式碼,客戶端每呼叫一次 hello, 就會進入到該程式碼
                        if (client == null) {
                            initClient();
                        }
    
                        //設定要發給伺服器端的資訊
                        //服務名#方法名#引數
                        client.setPara(providerName +"#"+ method.getName() + "#" + args[0]);
    
                        //
                        return executor.submit(client).get();
    
                    });
        }
    
        //初始化客戶端
        private static void initClient() {
            client = new NettyClientHandler();
            //建立EventLoopGroup
            NioEventLoopGroup group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(
                            new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel ch) throws Exception {
                                    ChannelPipeline pipeline = ch.pipeline();
                                    pipeline.addLast(new StringDecoder());
                                    pipeline.addLast(new StringEncoder());
                                    pipeline.addLast(client);
                                }
                            }
                    );
    
            try {
                bootstrap.connect("127.0.0.1", 7000).sync();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
  3. 客戶端啟動類

    public class ClientBootstrap {
        //服務介面
        public static final String SERVICE_NAME = "HelloService";
        //服務方法
        public static final String METHOD_NAME_TEST = "test";
        public static final String METHOD_NAME_HELLO = "hello";
    
        public static void main(String[] args) throws Exception {
    
            //建立一個消費者
            NettyClient customer = new NettyClient();
    
            //建立代理物件
            HelloService service = (HelloService) customer.getBean(HelloService.class, SERVICE_NAME);
    
            for (; ; ) {
                Thread.sleep(2 * 1000);
                //通過代理物件呼叫服務提供者的方法(服務)
                String res = service.test("你好 dubbo~");
                System.out.println("test呼叫的結果 res= " + res);
                String res2 = service.hello("你好 dubbo~");
                System.out.println("hello呼叫的結果 res= " + res2);
            }
        }
    }