1. 程式人生 > 其它 >RPC實現Consumer 遠端呼叫

RPC實現Consumer 遠端呼叫

梳理一下基本的實現思路,主要完成一個這樣的功能:API 模組中的介面功能在服務端實現(並沒有在客戶端實現)。因此,客戶端呼叫API 中定義的某一個介面方法時,實際上是要發起一次網路請求去呼叫服務端的某一個服務。而這個網路請求首先被註冊中心接收,由註冊中心先確定需要呼叫的服務的位置,再將請求轉發至真實的服務實現,最終呼叫服務端程式碼,將返回值通過網路傳輸給客戶端。整個過程對於客戶端而言是完全無感知的,就像呼叫本地方法一樣。具體呼叫過程如下圖所示:

下面來看程式碼實現,建立RpcProxy 類:

import java.lang.reflect.Proxy;
public class RpcProxy {
	public static <T> T create(Class<?> clazz){
		//clazz 傳進來本身就是interface
		MethodProxy proxy = new MethodProxy(clazz);
		Class<?> [] interfaces = clazz.isInterface() ?
		new Class[]{clazz} :
		clazz.getInterfaces();
		T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);
		return result;
	}
}

在RpcProxy 類的內部實現遠端方法呼叫的代理類,即由Netty 傳送網路請求,具體程式碼如下:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class RpcProxy {
	public static <T> T create(Class<?> clazz){
	}
	private static class MethodProxy implements InvocationHandler {
		private Class<?> clazz;
		public MethodProxy(Class<?> clazz){
			this.clazz = clazz;
		}
		public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
			//如果傳進來是一個已實現的具體類(本次演示略過此邏輯)
			if (Object.class.equals(method.getDeclaringClass())) {
			try {
				return method.invoke(this, args);
			} catch (Throwable t) {
				t.printStackTrace();
			}//如果傳進來的是一個介面(核心)
			} else {
				return rpcInvoke(proxy,method, args);
			}
			return null;
		}
		/**
		* 實現介面的核心方法
		* @param method
		* @param args
		* @return
		*/
		public Object rpcInvoke(Object proxy,Method method,Object[] args){
			//傳輸協議封裝
			InvokerProtocol msg = new InvokerProtocol();
			msg.setClassName(this.clazz.getName());
			msg.setMethodName(method.getName());
			msg.setValues(args);
			msg.setParames(method.getParameterTypes());
			final RpcProxyHandler consumerHandler = new RpcProxyHandler();
			EventLoopGroup group = new NioEventLoopGroup();
			try {
			Bootstrap b = new Bootstrap();
			b.group(group)
			.channel(NioSocketChannel.class)
			.option(ChannelOption.TCP_NODELAY, true)
			.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {
					ChannelPipeline pipeline = ch.pipeline();
					//自定義協議解碼器
					/** 入參有5 個,分別解釋如下
					maxFrameLength:框架的最大長度。如果幀的長度大於此值,則將丟擲TooLongFrameException。
					lengthFieldOffset:長度欄位的偏移量:即對應的長度欄位在整個訊息資料中得位置
					lengthFieldLength:長度欄位的長度:如:長度欄位是int 型表示,那麼這個值就是4(long 型就是8)
					lengthAdjustment:要新增到長度欄位值的補償值
					initialBytesToStrip:從解碼幀中去除的第一個位元組數
					*/
					pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
					//自定義協議編碼器
					pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
					//物件引數型別編碼器
					pipeline.addLast("encoder", new ObjectEncoder());
					//物件引數型別解碼器
					pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,
					ClassResolvers.cacheDisabled(null)));
					pipeline.addLast("handler",consumerHandler);
				}
			});
			ChannelFuture future = b.connect("localhost", 8080).sync();
			future.channel().writeAndFlush(msg).sync();
			future.channel().closeFuture().sync();
			} catch(Exception e){
				e.printStackTrace();
			}finally {
				group.shutdownGracefully();
			}
			return consumerHandler.getResponse();
		}
	}
}

接收網路呼叫的返回值

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
	private Object response;
	public Object getResponse() {
		return response;
	}
	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		response = msg;
	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		System.out.println("client exception is general");
	}
}

完成客戶端呼叫程式碼:

public class RpcConsumer {
	public static void main(String [] args){
		IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class);
		System.out.println(rpcHello.hello("Tom 老師"));
		IRpcService service = RpcProxy.create(IRpcService.class);
		System.out.println("8 + 2 = " + service.add(8, 2));
		System.out.println("8 - 2 = " + service.sub(8, 2));
		System.out.println("8 * 2 = " + service.mult(8, 2));
		System.out.println("8 / 2 = " + service.div(8, 2));
	}
}