netty之 -- 手寫rpc框架
阿新 • • 發佈:2020-09-16
接下來手寫一個簡陋的rpc框架,首先分析一下呼叫流程
話不多說,直接上程式碼:
一個公共介面,相當於protobuf協議中的proto檔案
package com.yang.java.main.netty.rpc.publicInterface; /** * Description: * * @author mark * Date 2020/9/16 */ public interface PublicInterface { String hello(String msg); }
服務端實現:
實現宣告的介面
package com.yang.java.main.netty.rpc.provider;import com.yang.java.main.netty.rpc.publicInterface.PublicInterface; /** * Description: * * @author mark * Date 2020/9/16 */ public class PublicInterfaceImpl implements PublicInterface { @Override public String hello(String msg) { System.out.println("receive from customer: " + msg);return "provider receive the message: " + msg; } }
netty的服務端
package com.yang.java.main.netty.rpc.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * Description: * * @author mark * Date 2020/9/16 */ public class NettyServer { // 學習一下netty的命名方法 public static void startServer(String host, int port){ startServer0(host, port); } private static void startServer0(String host, int port){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(new NettyServerHandler()); //業務處理器 } } ); ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync(); System.out.println("provider is start ~~"); channelFuture.channel().closeFuture().sync(); }catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package com.yang.java.main.netty.rpc.netty; import com.yang.java.main.netty.rpc.provider.PublicInterfaceImpl; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Description: * * @author mark * Date 2020/9/16 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //獲取客戶端傳送的訊息,並呼叫服務 System.out.println("msg: " + msg); String result = new PublicInterfaceImpl().hello(msg.toString()); ctx.writeAndFlush(result); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
package com.yang.java.main.netty.rpc.provider; import com.yang.java.main.netty.rpc.netty.NettyServer; /** * Description: * * @author mark * Date 2020/9/16 */ public class ProviderBootstrap { public static void main(String[] args) { NettyServer.startServer("127.0.0.1", 7000); } }
netty的客戶端
package com.yang.java.main.netty.rpc.netty; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.lang.reflect.Proxy; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Description: * * @author mark * Date 2020/9/16 */ public class NettyClient { //建立執行緒池 private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); private static NettyClientHandler client; private int count = 0; public Object getBean(final Class<?> providerClass) { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{providerClass}, (proxy, method, args) -> { System.out.println("(proxy, method, args) come " + (++count) + "time"); if (client == null) { initClient(); } client.setParas((String) args[0]); return executor.submit(client).get(); // 這個其實就是呼叫call方法 }); } // 初始化客戶端 private static void initClient() { client = new NettyClientHandler(); //建立EventLoopGroup NioEventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new StringDecoder()); pipeline.addLast(new StringEncoder()); pipeline.addLast(client); } } ); try { bootstrap.connect("127.0.0.1", 7000).sync(); } catch (Exception e) { e.printStackTrace(); } } }
package com.yang.java.main.netty.rpc.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.Callable; /** * Description: * 需要繼承CallAble * * @author mark * Date 2020/9/16 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { //上下文,因此需要正在call方法中使用,因此快取起來 private ChannelHandlerContext context; //返回的結果 private String result; //客戶端呼叫方法時,傳入的引數 private String paras; // 必須加同步鎖,通過wait等到channelRead @Override public synchronized Object call() throws Exception { System.out.println("call before wait"); context.writeAndFlush(paras); wait(); System.out.println("call after wait"); return result; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); context = ctx; } // 必須加同步鎖,完成之後通過notify告知call已收到訊息 @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("channelRead"); result = msg.toString(); notify(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } public void setParas(String paras) { System.out.println("set paras"); this.paras = paras; } }
package com.yang.java.main.netty.rpc.consumer; import com.yang.java.main.netty.rpc.netty.NettyClient; import com.yang.java.main.netty.rpc.publicInterface.PublicInterface; /** * Description: * * @author mark * Date 2020/9/16 */ public class ClientBootstrap { public static void main(String[] args){ //建立一個消費者 NettyClient customer = new NettyClient(); //建立代理物件 PublicInterface provider = (PublicInterface) customer.getBean(PublicInterface.class); for (;; ) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //通過代理物件呼叫服務提供者的方法(服務) String res = provider.hello("hello rpc~"); System.out.println("result: " + res); } } }
結果分析,符合預期
proxy, method, args) come 1time set paras channelActive call before wait channelRead call after wait result: provider receive the message: hello rpc~ (proxy, method, args) come 2time set paras call before wait channelRead call after wait result: provider receive the message: hello rpc~
簡要步驟如下:
- 服務消費方(customer)以本地呼叫方式呼叫服務
- customer stub(grpc一般這樣簡寫) 接收到呼叫後負責將方法、引數等封裝成能夠進行網路傳輸的訊息體
- customer stub 將訊息進行編碼併發送到服務端
- provider stub 收到訊息後進行解碼
- providerstub 根據解碼結果呼叫本地的服務
- 本地服務執行並將結果返回給 providerstub
- providerstub 將返回匯入結果進行編碼併發送至消費方
- client stub 接收到訊息並進行解碼
- 服務消費方(customer)得到結果