RPC實現Consumer 遠端呼叫
阿新 • • 發佈:2021-01-14
梳理一下基本的實現思路,主要完成一個這樣的功能:API 模組中的介面功能在服務端實現(並沒有在客戶端實現)。因此,客戶端呼叫API 中定義的某一個介面方法時,實際上是要發起一次網路請求去呼叫服務端的某一個服務。而這個網路請求首先被註冊中心接收,由註冊中心先確定需要呼叫的服務的位置,再將請求轉發至真實的服務實現,最終呼叫服務端程式碼,將返回值通過網路傳輸給客戶端。整個過程對於客戶端而言是完全無感知的,就像呼叫本地方法一樣。具體呼叫過程如下圖所示:
下面來看程式碼實現,建立RpcProxy 類:
import java.lang.reflect.Proxy; public class RpcProxy { public static <T> T create(Class<?> clazz){ //clazz 傳進來本身就是interface MethodProxy proxy = new MethodProxy(clazz); Class<?> [] interfaces = clazz.isInterface() ? new Class[]{clazz} : clazz.getInterfaces(); T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy); return result; } }
在RpcProxy 類的內部實現遠端方法呼叫的代理類,即由Netty 傳送網路請求,具體程式碼如下:
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class RpcProxy { public static <T> T create(Class<?> clazz){ } private static class MethodProxy implements InvocationHandler { private Class<?> clazz; public MethodProxy(Class<?> clazz){ this.clazz = clazz; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //如果傳進來是一個已實現的具體類(本次演示略過此邏輯) if (Object.class.equals(method.getDeclaringClass())) { try { return method.invoke(this, args); } catch (Throwable t) { t.printStackTrace(); }//如果傳進來的是一個介面(核心) } else { return rpcInvoke(proxy,method, args); } return null; } /** * 實現介面的核心方法 * @param method * @param args * @return */ public Object rpcInvoke(Object proxy,Method method,Object[] args){ //傳輸協議封裝 InvokerProtocol msg = new InvokerProtocol(); msg.setClassName(this.clazz.getName()); msg.setMethodName(method.getName()); msg.setValues(args); msg.setParames(method.getParameterTypes()); final RpcProxyHandler consumerHandler = new RpcProxyHandler(); EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //自定義協議解碼器 /** 入參有5 個,分別解釋如下 maxFrameLength:框架的最大長度。如果幀的長度大於此值,則將丟擲TooLongFrameException。 lengthFieldOffset:長度欄位的偏移量:即對應的長度欄位在整個訊息資料中得位置 lengthFieldLength:長度欄位的長度:如:長度欄位是int 型表示,那麼這個值就是4(long 型就是8) lengthAdjustment:要新增到長度欄位值的補償值 initialBytesToStrip:從解碼幀中去除的第一個位元組數 */ pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); //自定義協議編碼器 pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); //物件引數型別編碼器 pipeline.addLast("encoder", new ObjectEncoder()); //物件引數型別解碼器 pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); pipeline.addLast("handler",consumerHandler); } }); ChannelFuture future = b.connect("localhost", 8080).sync(); future.channel().writeAndFlush(msg).sync(); future.channel().closeFuture().sync(); } catch(Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } return consumerHandler.getResponse(); } } }
接收網路呼叫的返回值
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class RpcProxyHandler extends ChannelInboundHandlerAdapter { private Object response; public Object getResponse() { return response; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { response = msg; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exception is general"); } }
完成客戶端呼叫程式碼:
public class RpcConsumer {
public static void main(String [] args){
IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class);
System.out.println(rpcHello.hello("Tom 老師"));
IRpcService service = RpcProxy.create(IRpcService.class);
System.out.println("8 + 2 = " + service.add(8, 2));
System.out.println("8 - 2 = " + service.sub(8, 2));
System.out.println("8 * 2 = " + service.mult(8, 2));
System.out.println("8 / 2 = " + service.div(8, 2));
}
}