解密Dubbo:自己動手編寫RPC框架
用,現在我們就來動手自己編寫一個RPC框架,通過這篇文章的學習,你將學習到
- 分散式系統的概念
- RPC遠端方法呼叫的應用
- Dubbo的原理深入理解
當然,如果要完全自己編寫一個RPC框架,我們需要掌握以下知識點
- 網路程式設計(網路通訊) 本文將使用netty4網路通訊框架
- 多執行緒相關知識
- 反射相關知識
- jdk的動態代理
- Spring框架的相關知識
如果對於上述的知識點有一部分不是很理解,也不會影響你閱讀本文和對Dubbo的RPC呼叫原理的理解
好了,我們先來簡單的描述一下整個RPC呼叫的業務流程圖
為了可以實現上面的RPC呼叫,我們建立的RPC框架的模組之間的關係圖如下:
對於上面的每個模組的具體作用,使用一個表格簡單的進行描述
模組名稱 | 主要功能 |
---|---|
rpc-register | 主要完成可註冊中心Zookeeper的互動<br />RPC服務端使用該模組往註冊中心註冊地址和埠<br />RPC客戶端通過該模組獲取實時已近註冊的服務地址和埠 |
rpc-common | 定義RPC通訊的請求訊息和響應訊息的規則,以及訊息的序列化和反序列化的幫助類 |
rpc-server | RPC服務端,啟動RPC服務,掃描app-server中的所有可以提供的服務列表並儲存<br />接受RPC客戶端的訊息並且通過反射呼叫具體的方法<br/>響應RPC客戶端,把方法執行結果返回到RPC客戶端 |
rpc-client | RPC客戶端,通過網路通訊往RPC服務端傳送請求呼叫訊息<br/>接受服務端的響應訊息<br/>配置動態代理類,所有的方法呼叫都通過網路呼叫傳送到RPC服務端 |
app-common | 具體的應用中的介面和JavaBean物件,類似於service模組和bean模組 |
app-server | 通過Spring的配置啟動SpringContext,並且配置RpcServer和RpcRegistry Bean物件的建立<br />實現app-common中的介面,並且在介面上添加註解@RpcService(IProductService.class)可以讓RPCServer識別到該服務<br />啟動服務 |
app-client | 通過Spring的配置建立RpcDiscover物件和RpcProxy物件,其中RpcDiscover用於從註冊中心獲取到服務的地址資訊,RpcProxy用於建立類的動態代理物件 |
接下來我們來看一下具體的實現程式碼
-
rpc-register
這個模組使用者和註冊中心進行互動,主要包括三個類
- Constant常量定義,設定連線ZKServer的相關引數
- RpcRegistry:往註冊中心ZKServer設定地址資訊,RPC-Server需要使用
- RpcDiscover: 從註冊中心ZKServer獲取服務端的網路地址資訊 RPC-client需要使用
具體的實現程式碼
package cn.wolfcode.rpc.register; public interface Constant { //定義客戶端連線session會話超時時間,單位為毫秒,該值的設定和zkServer設定的心跳時間有關係 int SESSION_TIMEOUT=4000; // 定義用於儲存rpc通訊服務端的地址資訊的目錄 String REGISTRY_PATH="/rpc"; // 定義資料存放的具體目錄 String DATA_PATH=REGISTRY_PATH+"/data"; }
package cn.wolfcode.rpc.register; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Setter@Getter @AllArgsConstructor() @NoArgsConstructor public class RpcRegistry { public static final Logger LOGGER=LoggerFactory.getLogger(RpcRegistry.class); //zkServer的地址資訊 private String registryAddress; //zk客戶端程式 private ZooKeeper zooKeeper; public void createNode(String data) throws Exception{ //建立一個客戶端程式, 對於註冊可以不用監聽事件 zooKeeper= new ZooKeeper(registryAddress, Constant.SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { } }); if(zooKeeper!=null){ try{ //判斷註冊的目錄是否存在 Stat stat = zooKeeper.exists(Constant.REGISTRY_PATH, false); if(stat==null){ //如果不存在, 建立一個持久的節點目錄 zooKeeper.create(Constant.REGISTRY_PATH,null,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } //建立一個臨時的序列節點,並且儲存資料資訊 zooKeeper.create(Constant.DATA_PATH,data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL); }catch (Exception e){ LOGGER.error("",e); e.printStackTrace(); } }else{ LOGGER.debug("zooKeeper connect is null"); } } //測試程式 public static void main(String[] args) throws Exception { RpcRegistry rpcRegistry = new RpcRegistry(); rpcRegistry.setRegistryAddress("192.168.158.151:2181"); rpcRegistry.createNode("testdata"); //讓程式等待輸入,程式一直處於執行狀態 System.in.read(); } }
package cn.wolfcode.rpc.register; import lombok.Getter; import lombok.Setter; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; import java.util.Random; @Setter @Getter //地址發現,用於實時的獲取最新的RPC服務資訊 public class RpcDiscover { public static final Logger LOGGER=LoggerFactory.getLogger(RpcRegistry.class); //服務端地址 zkServer的地址 private String registryAddress; //獲取到的所有提供服務的伺服器列表 private volatile List<String> dataList=new ArrayList<>(); private ZooKeeper zooKeeper=null; //初始化zkClient客戶端 public RpcDiscover(String registryAddress) throws Exception { this.registryAddress = registryAddress; zooKeeper=new ZooKeeper(registryAddress, Constant.SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if(watchedEvent.getType()==Event.EventType.NodeChildrenChanged){ //監聽zkServer的伺服器列表變化 watchNode(); } } }); //獲取節點相關資料 watchNode(); } // 從dataList列表隨機獲取一個可用的服務端的地址資訊給rpc-client public String discover(){ int size=dataList.size(); if(size>0){ int index= new Random().nextInt(size); return dataList.get(index); } throw new RuntimeException("沒有找到對應的伺服器"); } //監聽服務端的列表資訊 private void watchNode(){ try{ //獲取子節點資訊 List<String> nodeList = zooKeeper.getChildren(Constant.REGISTRY_PATH, true); List<String> dataList=new ArrayList<>(); for (String node : nodeList) { byte[] bytes = zooKeeper.getData(Constant.REGISTRY_PATH + "/" + node, false, null); dataList.add(new String(bytes)); } this.dataList=dataList; }catch (Exception e){ LOGGER.error("",e); e.printStackTrace(); } } //測試程式 public static void main(String[] args) throws Exception { //列印獲取到的連線地址資訊 System.out.println(new RpcDiscover("192.168.158.151:2181").discover()); System.in.read(); } }
-
rpc-common
定義RPC通訊的請求訊息和響應訊息的規則,以及訊息的序列化和反序列化的幫助類,主要包括
- RpcRequest 請求訊息封裝物件
- RpcResponse 響應訊息封裝物件
- SerializationUtil 訊息的序列化,煩序列化幫助類
- RpcEncoder 把訊息物件轉換為位元組陣列進行通訊
- RpcDecoder 把獲取到的位元組陣列轉換為對應的訊息物件
具體程式碼如下
package cn.wolfcode.rpc.common; import lombok.*; @Setter @Getter @NoArgsConstructor @AllArgsConstructor @ToString //RPC通訊的資料請求規則 public class RpcRequest { // 請求訊息的訊息Id private String requestId; // 請求的具體的類名(介面名稱) private String className; // 請求的具體的方法名稱 private String methodName; // 請求的方法引數型別列表 private Class<?>[] parameterTypes; // 請求的方法引數列表 private Object[] parameters; }
package cn.wolfcode.rpc.common; import lombok.*; @Setter @Getter @NoArgsConstructor @AllArgsConstructor @ToString //RPC通訊訊息的響應資料規則 public class RpcResponse { //響應的訊息id private String responseId; //請求的訊息id private String requestId; // 響應的訊息是否成功 private boolean success; // 響應的資料結果 private Object result; // 如果有異常資訊,在該物件中記錄異常資訊 private Throwable throwable; }
package cn.wolfcode.rpc.common; import com.dyuproject.protostuff.LinkedBuffer; import com.dyuproject.protostuff.ProtostuffIOUtil; import com.dyuproject.protostuff.Schema; import com.dyuproject.protostuff.runtime.RuntimeSchema; import org.objenesis.Objenesis; import org.objenesis.ObjenesisStd; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 序列化工具類(基於 Protostuff 實現) 用於把物件序列化位元組陣列, 把位元組陣列反序列化物件 */ public class SerializationUtil { private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>(); private static Objenesis objenesis = new ObjenesisStd(true); private SerializationUtil() { } /** * 獲取類的schema * @param cls * @return */ @SuppressWarnings("unchecked") private static <T> Schema<T> getSchema(Class<T> cls) { Schema<T> schema = (Schema<T>) cachedSchema.get(cls); if (schema == null) { schema = RuntimeSchema.createFrom(cls); if (schema != null) { cachedSchema.put(cls, schema); } } return schema; } /** * 序列化(物件 -> 位元組陣列) */ @SuppressWarnings("unchecked") public static <T> byte[] serialize(T obj) { Class<T> cls = (Class<T>) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema<T> schema = getSchema(cls); return ProtostuffIOUtil.toByteArray(obj, schema, buffer);//序列化 } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } /** * 反序列化(位元組陣列 -> 物件) */ public static <T> T deserialize(byte[] data, Class<T> cls) { try { /* * 如果一個類沒有引數為空的構造方法時候,那麼你直接呼叫newInstance方法試圖得到一個例項物件的時候是會丟擲異常的 * 通過ObjenesisStd可以完美的避開這個問題 * */ T message = (T) objenesis.newInstance(cls);//例項化 Schema<T> schema = getSchema(cls);//獲取類的schema ProtostuffIOUtil.mergeFrom(data, message, schema); return message; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
package cn.wolfcode.rpc.common; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; //對傳遞的訊息進行編碼, 因為是請求/響應物件的傳遞,先編碼為位元組陣列在傳送到伺服器解碼 public class RpcEncoder extends MessageToByteEncoder { // 傳遞的資料的物件型別 private Class genericClass; public RpcEncoder(Class genericClass) { this.genericClass = genericClass; } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if(genericClass.isInstance(msg)){ //序列化請求訊息為位元組陣列 byte[] bytes = SerializationUtil.serialize(msg); // 把資料寫入到下一個通道(channel)或者是發往服務端 out.writeBytes(bytes); } } }
package cn.wolfcode.rpc.common; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; //對傳遞的訊息進行解碼, 接受到的資料是位元組陣列,需要把陣列轉換為對應的請求/響應訊息物件 public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override //解碼方法,把位元組陣列轉換為訊息物件 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //訊息的長度 int size=in.readableBytes(); if(size<4){//保證所有的訊息都完全接受完成 return; } byte[] bytes =new byte[size]; //把傳遞的位元組陣列讀取到bytes中 in.readBytes(bytes); // 反序列化為物件(RPCRequest/RPCResponse物件) Object object = SerializationUtil.deserialize(bytes, genericClass); //輸出物件 out.add(object); //重新整理快取 ctx.flush(); } }
-
rpc-server
RPC服務端,啟動RPC服務,掃描app-server中的所有可以提供的服務列表並儲存,接受RPC客戶端的訊息並且通過反射呼叫具體的方法,響應RPC客戶端,把方法執行結果返回到RPC客戶端
主要包括:
- RpcService 定義一個註解,用於標記服務程式的提供者,通過Spring掃描出所有的服務並且儲存
- RpcServerHandler 處理RPC客戶端請求,呼叫服務提供者的具體方法,響應執行結果
- RpcServer 掃描所有的服務(標記了@RPCService的類),啟動RPC服務
package cn.wolfcode.rpc.server; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 這個註解用於貼在每個提供服務的實現類, * 在Spring容器啟動的時候,自動掃描到貼了該註解的所有的服務 */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface RpcService { public Class<?> value(); }
package cn.wolfcode.rpc.server; import cn.wolfcode.rpc.common.RpcDecoder; import cn.wolfcode.rpc.common.RpcEncoder; import cn.wolfcode.rpc.common.RpcRequest; import cn.wolfcode.rpc.common.RpcResponse; import cn.wolfcode.rpc.register.RpcRegistry; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.commons.collections4.MapUtils; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import java.util.HashMap; import java.util.Map; @Setter @Getter @NoArgsConstructor @AllArgsConstructor //RPC服務端啟動,實現Spring的感知介面 public class RpcServer implements ApplicationContextAware,InitializingBean { //用於儲存所有提供服務的方法, 其中key為類的全路徑名, value是所有的實現類 private final Map<String,Object> serviceBeanMap=new HashMap<>(); //rpcRegistry 用於註冊相關的地址資訊 private RpcRegistry rpcRegistry; //提供服務的地址資訊 格式為 192.168.158.151:9000 類似 private String serverAddress; //在Spring容器啟動完成後會執行該方法 @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { //獲取到所有貼了RpcService註解的Bean物件 Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class); if(MapUtils.isNotEmpty(serviceBeanMap)){ for (Object object : serviceBeanMap.values()) { //獲取到類的路徑名稱 String serviceName = object.getClass().getAnnotation(RpcService.class).value().getName(); //把獲取到的資訊儲存到serviceBeanMap中 this.serviceBeanMap.put(serviceName,object); } } System.out.println("伺服器: "+serverAddress +" 提供的服務列表: "+ serviceBeanMap ); } // 初始化完成後執行 @Override public void afterPropertiesSet() throws Exception { //建立服務端的通訊物件 ServerBootstrap server = new ServerBootstrap(); // 建立非同步通訊的事件組 用於建立TCP連線的 NioEventLoopGroup bossGroup = new NioEventLoopGroup(); // 建立非同步通訊的事件組 用於處理Channel(通道)的I/O事件 NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try{ //開始設定server的相關引數 server.group(bossGroup,workerGroup) //啟動非同步ServerSocket .channel(NioServerSocketChannel.class) //初始化通道資訊 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RpcDecoder(RpcRequest.class))//1 解碼請求引數 .addLast(new RpcEncoder(RpcResponse.class))//2 編碼響應資訊 .addLast(new RpcServerHandler(serviceBeanMap));//3 請求處理 } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true);; String host=serverAddress.split(":")[0] ;//獲取到主機地址 int port=Integer.valueOf(serverAddress.split(":")[1]);//埠 ChannelFuture future = server.bind(host, port).sync();//開啟非同步通訊服務 System.out.println("伺服器啟動成功:"+future.channel().localAddress()); rpcRegistry.createNode(serverAddress); System.out.println("向zkServer註冊服務地址資訊"); future.channel().closeFuture().sync();//等待通訊完成 }catch (Exception e){ e.printStackTrace(); }finally { //優雅的關閉socket bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
package cn.wolfcode.rpc.server;
import cn.wolfcode.rpc.common.RpcRequest;
import cn.wolfcode.rpc.common.RpcResponse;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.UUID;
@Setter
@Getter
@NoArgsConstructor
public class RpcServerHandler extends ChannelInboundHandlerAdapter{
private Map<String,Object> serviceBeanMap;
public RpcServerHandler(Map<String, Object> serviceBeanMap) {
this.serviceBeanMap = serviceBeanMap;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("RpcServerHandler.channelRead");
System.out.println(msg);
RpcRequest rpcRequest= (RpcRequest) msg;
RpcResponse rpcResponse=handler(rpcRequest);
//告訴客戶端,關閉socket連線
ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
}
private RpcResponse handler(RpcRequest rpcRequest) {
//建立一個響應訊息物件
RpcResponse rpcResponse =new RpcResponse();
//設定響應訊息ID
rpcResponse.setResponseId(UUID.randomUUID().toString());
//請求訊息ID
rpcResponse.setRequestId(rpcRequest.getRequestId());
try{
//獲取到類名(介面名稱)
String className = rpcRequest.getClassName();
//獲取到方法名
String methodName = rpcRequest.getMethodName();
//獲取到引數型別列表
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
//獲取到引數列表
Object[] parameters = rpcRequest.getParameters();
//獲取到具位元組碼物件
Class<?> clz = Class.forName(className);
//獲取到實現類
Object serviceBean = serviceBeanMap.get(className);
if(serviceBean==null){
throw new RuntimeException(className+"沒有找到對應的serviceBean:"+className+":beanMap:"+serviceBeanMap);
}
//反射呼叫方法
Method method = clz.getMethod(methodName, parameterTypes);
if(method==null)
throw new RuntimeException("沒有找到對應的方法");
Object result = method.invoke(serviceBean, parameters);
rpcResponse.setSuccess(true);
//設定方法呼叫的結果
rpcResponse.setResult(result);
}catch (Exception e){
rpcResponse.setSuccess(false);
rpcResponse.setThrowable(e);
e.printStackTrace();
}
return rpcResponse;
}
}
-
rpc-client
RPC客戶端,通過網路通訊往RPC服務端傳送請求呼叫訊息,接受服務端的響應訊息,配置動態代理類,所有的方法呼叫都通過網路呼叫傳送到RPC服務端
其中包括的主要程式碼:
- RpcProxy 對於每一個類都建立一個動態代理物件,並且在invoke方法建立rpc客戶端並且傳送網路通訊請求
- RpcClient RPC通訊客戶端,啟動RPC通訊服務,建立TCP連線,傳送請求,接受響應
具體實現程式碼:
package cn.wolfcode.rpc.client;
import cn.wolfcode.rpc.common.RpcDecoder;
import cn.wolfcode.rpc.common.RpcEncoder;
import cn.wolfcode.rpc.common.RpcRequest;
import cn.wolfcode.rpc.common.RpcResponse;
import cn.wolfcode.rpc.register.RpcDiscover;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
//RPC通訊客戶端,往服務端傳送請求,並且接受服務端的響應
public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {
//訊息響應物件
private RpcResponse rpcResponse;
//訊息請求物件
private RpcRequest rpcRequest;
// 同步鎖 資源物件
private Object object=new Object();
// 用於獲取服務地址列表資訊
private RpcDiscover rpcDiscover;
//建構函式
public RpcClient(RpcRequest rpcRequest,RpcDiscover rpcDiscover) {
this.rpcDiscover = rpcDiscover;
this.rpcRequest=rpcRequest;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
this.rpcResponse=msg;//響應訊息
synchronized (object){
ctx.flush();//重新整理快取
object.notifyAll();//喚醒等待
}
}
//傳送訊息
public RpcResponse send() throws Exception {
//建立一個socket通訊物件
Bootstrap client = new Bootstrap();
//建立一個通訊組,負責Channel(通道)的I/O事件的處理
NioEventLoopGroup loopGroup = new NioEventLoopGroup();
try{
client.group(loopGroup)//設定引數
.channel(NioSocketChannel.class)//使用非同步socket通訊
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RpcEncoder(RpcRequest.class))//編碼請求物件
.addLast(new RpcDecoder(RpcResponse.class))//解碼響應物件
.addLast(RpcClient.this);//傳送請求物件
}
}).option(ChannelOption.SO_KEEPALIVE, true);;
String serverAddress = rpcDiscover.discover();//獲取一個伺服器地址
String host=serverAddress.split(":")[0];
int port=Integer.valueOf(serverAddress.split(":")[1]);
ChannelFuture future = client.connect(host,port).sync();
System.out.println("客戶端準備傳送資料:"+rpcRequest);
future.channel().writeAndFlush(rpcRequest).sync();
synchronized (object){
object.wait();//執行緒等待,等待客戶端響應
}
if (rpcResponse != null) {
future.channel().closeFuture().sync();//等待服務端關閉socket
}
return rpcResponse;
}finally {
loopGroup.shutdownGracefully();//優雅關閉socket
}
}
/**
* 異常處理
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.close();
}
}
package cn.wolfcode.rpc.client;
import cn.wolfcode.rpc.common.RpcRequest;
import cn.wolfcode.rpc.common.RpcResponse;
import cn.wolfcode.rpc.register.RpcDiscover;
import lombok.Getter;
import lombok.Setter;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.UUID;
@Setter
@Getter
//動態代理類,用於獲取到每個類的代理物件
//對於被代理物件的所有的方法呼叫都會執行invoke方法
public class RpcProxy {
//用於獲取到RPC-Server的地址資訊
private RpcDiscover rpcDiscover;
@SuppressWarnings("all")
public <T> T getInstance(Class<T> interfaceClass){
T instance = (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//建立請求物件
RpcRequest rpcRequest = new RpcRequest();
//獲取到被呼叫的類名 和RPC-Server中的serviceMap中的key進行匹配
String className=method.getDeclaringClass().getName();
//獲取到方法的引數列表
Class<?>[] parameterTypes = method.getParameterTypes();
//生成一個請求的id
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setClassName(className);//類名
rpcRequest.setParameterTypes(parameterTypes);//引數型別列表
rpcRequest.setParameters(args);//引數列表
rpcRequest.setMethodName(method.getName());//呼叫的放方法名稱
RpcResponse rpcResponse = new RpcClient(rpcRequest, rpcDiscover).send();//建立一個RPCclient物件,並且傳送訊息到服務端
//返回呼叫結果
return rpcResponse.getResult();
}
});
//返回一個代理物件
return instance;
}
}
-
app-common
這是具體應用的通用模組,和具體的專案結構有關係,這裡主要包括介面定義和JavaBean物件的定義
具體程式碼為:
package cn.wolfcode.app.common; public interface IProductService { /** * 儲存產品 * @param product */ void save(Product product); /** * 根據產品id刪除產品 * @param productId */ void deleteById(Long productId); /** * 修改產品資訊 * @param product */ void update(Product product); /** * 根據產品id獲取到產品資訊 * @param productId * @return */ Product get(Long productId); }
package cn.wolfcode.app.common; import lombok.*; import java.math.BigDecimal; /** * 產品資訊 */ @Setter @Getter @ToString @AllArgsConstructor @NoArgsConstructor public class Product { private Long id;//id private String sn;//產品編號 private String name;//產品名稱 private BigDecimal price;//產品價格 }
-
app-server
這個模組主要是定義服務的具體實現和啟動Spring容器,在啟動Spring容器的時候需要建立RpcRegistry,RpcServer物件
具體程式碼實現:
package cn.wolfcode.app.server; import cn.wolfcode.app.common.IProductService; import cn.wolfcode.app.common.Product; import cn.wolfcode.rpc.server.RpcService; import org.springframework.stereotype.Component; import java.math.BigDecimal; @Component @RpcService(IProductService.class) public class ProductServiceImpl implements IProductService { @Override public void save(Product product) { System.out.println("產品儲存成功: "+product); } @Override public void deleteById(Long productId) { System.out.println("產品刪除成功: "+ productId); } @Override public void update(Product product) { System.out.println("產品修改成功: "+ product); } @Override public Product get(Long productId) { System.out.println("產品獲取成功"); return new Product(1L,"001","膝上型電腦",BigDecimal.TEN); } }
package cn.wolfcode.app.server; import org.springframework.context.support.ClassPathXmlApplicationContext; public class BootAppServer { public static void main(String[] args) { //啟動Spring容器 new ClassPathXmlApplicationContext("classpath:application.xml"); } }
其中配置檔案:
- application.xml Spring的配置檔案
- log4j.properties 日誌配置檔案
- rpc.properties 服務提供者的地址和埠 以及zkServer的連線地址和埠
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="cn.wolfcode.app.server"/> <context:property-placeholder location="classpath:rpc.properties"/> <bean id="serviceRegistry" class="cn.wolfcode.rpc.register.RpcRegistry"> <property name="registryAddress" value="${registry.address}"/> </bean> <bean id="rpcServer" class="cn.wolfcode.rpc.server.RpcServer"> <property name="serverAddress" value="${server.address}"/> <property name="rpcRegistry" ref="serviceRegistry"/> </bean> </beans>
log4j.rootLogger=ERROR,console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.out log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%m%n log4j.logger.cn.wolfcode.rpc=DEBUG
# zookeeper server registry.address=192.168.158.151:2181 # rpc server server.address=192.168.158.1:9090
-
app-client
通過Spring的配置建立RpcDiscover物件和RpcProxy物件,其中RpcDiscover用於從註冊中心獲取到服務的地址資訊,RpcProxy用於建立類的動態代理物件
測試類:使用Spring的Junit進行測試
package cn.wolfcode.app.client;
import cn.wolfcode.app.common.IProductService;
import cn.wolfcode.app.common.Product;
import cn.wolfcode.rpc.client.RpcProxy;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.math.BigDecimal;
//模擬客戶端啟動
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations="classpath:application.xml")
public class APP {
@Autowired
private RpcProxy rpcProxy;
private IProductService productService;
@Before
public void init() {
productService = rpcProxy.getInstance(IProductService.class);
}
@Test
public void testSave() throws Exception {
productService.save(new Product(2L,"002","內衣",BigDecimal.TEN));
}
@Test
public void testDelete() throws Exception {
productService.deleteById(2L);
}
@Test
public void testUpdate() throws Exception {
productService.update(new Product(2L,"002","內衣",BigDecimal.ONE));
}
@Test
public void testGet() throws Exception {
Product product = productService.get(1L);
System.out.println("獲取到的產品資訊為:"+product);
}
}
配置檔案資訊
application.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:component-scan base-package="cn.wolfcode.app.client"/>
<context:property-placeholder location="classpath:rpc.properties"/>
<bean id="serviceRpcDiscover" class="cn.wolfcode.rpc.register.RpcDiscover">
<constructor-arg name="registryAddress" value="${registry.address}"/>
</bean>
<bean id="rpcProxy" class="cn.wolfcode.rpc.client.RpcProxy">
<property name="rpcDiscover" ref="serviceRpcDiscover"/>
</bean>
</beans>
log4j.properties
log4j.rootLogger=ERROR,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%m%n
log4j.logger.cn.wolfcode.rpc=DEBUG
rpc.properties
# zookeeper server
registry.address=192.168.158.151:2181
如果要正常執行,請部署一個zookeeper註冊中心,修改rpc.properites的地址即可
- 先執行app-server中的BootAppServer
- 在執行app-client中的APP測試用例