Netty實現簡單DubboRpc
阿新 • • 發佈:2021-08-11
Netty實現簡單DubboRpc
1. RPC 基本介紹
-
RPC(Remote Procedure Call)
—遠端過程呼叫,是一個計算機通訊協議。該協議允許運行於一臺計算機的程式呼叫另一臺計算機的子程式,而程式設計師無需額外地為這個互動作用程式設計 -
兩個或多個應用程式都分佈在不同的伺服器上,它們之間的呼叫都像是本地方法呼叫一樣(如圖)
-
常見的
RPC
框架有:比較知名的如阿里的Dubbo
、Google
的gRPC
、Go
語言的rpcx
、Apache
的thrift
,Spring
旗下的SpringCloud
。
2. RPC 呼叫流程圖
3. 自己實現dubboRpc
- 建立一個介面,定義抽象方法。用於消費者和提供者之間的約定。
- 建立一個提供者,該類需要監聽消費者的請求,並按照約定返回資料。
- 建立一個消費者,該類需要透明的呼叫自己不存在的方法,內部需要使用
Netty
請求提供者返回資料
3.1 通用介面
public interface HelloService {
String hello(String mes);
String test(String mes);
}
3.2 伺服器端實現
-
介面實現類
public class HelloServiceImpl implements HelloService { private static int count = 0; //當有消費方呼叫該方法時, 就返回一個結果 @Override public String hello(String mes) { System.out.println("收到客戶端訊息=" + mes); return "hello"; } @Override public String test(String mes) { return "返回測試訊息"; } }
-
服務處理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //獲取客戶端傳送的訊息,並呼叫服務 System.out.println("msg=" + msg); //客戶端在呼叫伺服器的api 時,我們需要定義一個協議 //呼叫的是hello方法,就執行hello方法,返回給客戶端 if (msg.toString().startsWith(ClientBootstrap.SERVICE_NAME + "#" + ClientBootstrap.METHOD_NAME_HELLO)) { String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } ////呼叫的是test方法,就執行hello方法,返回給客戶端 if (msg.toString().startsWith(ClientBootstrap.SERVICE_NAME + "#" + ClientBootstrap.METHOD_NAME_TEST)) { String result = new HelloServiceImpl().test(msg.toString().substring(msg.toString().lastIndexOf("#") + 1)); ctx.writeAndFlush(result); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
-
服務類
public class NettyServer { public static void startServer(String hostName, int port) { startServer0(hostName, port); } //編寫一個方法,完成對NettyServer的初始化和啟動 private static void startServer0(String hostname, int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); //業務處理器 } } ); ChannelFuture channelFuture = serverBootstrap.bind(hostname, port).sync(); System.out.println("服務提供方開始提供服務~~"); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
-
啟動類
public class MyServerBootstrap { public static void main(String[] args) { //程式碼代填.. NettyServer.startServer("127.0.0.1", 7000); } }
3.3 伺服器端實現
-
客戶端處理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ChannelHandlerContext context;//上下文 private String result; //返回的結果 private String para; //客戶端呼叫方法時,傳入的引數 //與伺服器的連線建立後,就會被呼叫, 這個方法是第一個被呼叫(1) @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" channelActive 被呼叫 "); context = ctx; //因為我們在其它方法會使用到 ctx } //收到伺服器的資料後,呼叫方法 (4) // @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(" channelRead 被呼叫 "); result = msg.toString(); notify(); //喚醒等待的執行緒 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } //被代理物件呼叫, 傳送資料給伺服器,-> wait -> 等待被喚醒(channelRead) -> 返回結果 (3)-》5 @Override public synchronized Object call() throws Exception { System.out.println(" call1 被呼叫 "); context.writeAndFlush(para); //進行wait wait(); //等待channelRead 方法獲取到伺服器的結果後,喚醒 System.out.println(" call2 被呼叫 "); return result; //服務方返回的結果 } //(2) void setPara(String para) { System.out.println(" setPara "); this.para = para; } }
-
客戶
public class NettyClient { //建立執行緒池 private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static NettyClientHandler client; private int count = 0; //編寫方法使用代理模式,獲取一個代理物件 public Object getBean(final Class<?> serivceClass, final String providerName) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{serivceClass}, (proxy, method, args) -> { System.out.println("(proxy, method, args) 進入...." + (++count) + " 次"); //{} 部分的程式碼,客戶端每呼叫一次 hello, 就會進入到該程式碼 if (client == null) { initClient(); } //設定要發給伺服器端的資訊 //服務名#方法名#引數 client.setPara(providerName +"#"+ method.getName() + "#" + args[0]); // return executor.submit(client).get(); }); } //初始化客戶端 private static void initClient() { client = new NettyClientHandler(); //建立EventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(client); } } ); try { bootstrap.connect("127.0.0.1", 7000).sync(); } catch (Exception e) { e.printStackTrace(); } } }
-
客戶端啟動類
public class ClientBootstrap { //服務介面 public static final String SERVICE_NAME = "HelloService"; //服務方法 public static final String METHOD_NAME_TEST = "test"; public static final String METHOD_NAME_HELLO = "hello"; public static void main(String[] args) throws Exception { //建立一個消費者 NettyClient customer = new NettyClient(); //建立代理物件 HelloService service = (HelloService) customer.getBean(HelloService.class, SERVICE_NAME); for (; ; ) { Thread.sleep(2 * 1000); //通過代理物件呼叫服務提供者的方法(服務) String res = service.test("你好 dubbo~"); System.out.println("test呼叫的結果 res= " + res); String res2 = service.hello("你好 dubbo~"); System.out.println("hello呼叫的結果 res= " + res2); } } }