從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現
RPC-Common模組提供RPC-Server和RPC-Client的通用物件,封裝統一規則,使RPC Server和RPC Client 可以基於同一協議通訊。主要包含底層通訊的Netty所需的編碼解碼器(RpcEncoder,RpcDecoder),實現自定義協議的傳輸物件(RpcRequest、RpcResponse)以及編碼解碼器對Java物件序列化(反序列化)使用的工具 ProtoSerializationUtil。
文章目錄
一 模組介紹
結構
結構如下圖
流程圖
- RPC-Client封裝服務請求傳送到RPC-Server
- RPC-Server獲取請求,執行業務處理,返回結果
- RPC-Client獲取結果進行處理
二 pom檔案
RpcRequest和RpcResponse是Java bean,不需要多餘的依賴。
RpcEncoder和RpcDecoder是Netty編解碼器,依賴於netty-all。
ProtoSerializationUtil是基於ProtoStuff的序列化工具,並依賴objenesis實現更高階的反射功能,進行物件反序列化。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-netty-common</artifactId>
<parent>
<groupId>com.github.linshenkx</groupId>
<artifactId>rpc-netty-spring-boot-starter</artifactId>
<version>1.0.5.RELEASE</version>
<relativePath>../</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-core</artifactId>
</dependency>
<dependency>
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
</dependency>
<dependency>
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
</dependency>
</dependencies>
</project>
三 RpcRequest和RpcResponse
RpcRequest
RpcRequest是Rpc請求物件,其屬性相當於自定義協議規則,如下
其中requestId是為了區分對相同服務的不同請求,還可以加上超時時間、版本資訊等來定製自己的協議
@Data
public class RpcRequest {
/**
* 請求ID
*/
private String requestId;
/**
* 介面名稱
*/
private String interfaceName;
/**
* 方法名
*/
private String methodName;
/**
* 方法引數型別列表
*/
private Class<?>[] parameterTypes;
/**
* 引數列表
*/
private Object[] parameters;
}
RpcResponse
以下為最簡單也是最基本的RpcResponse,複雜一點可以區分訊息頭和訊息體,在訊息頭裡指定編碼長度,編碼型別等
@Data
public class RpcResponse {
/**
* 對應請求的requestId
*/
private String requestId;
/**
* 異常資訊
*/
private Exception exception;
/**
* 響應結果
*/
private Object result;
}
四 序列化工具
目前只提供了基於protostuff的序列化工具,後面可以升級成一個序列化引擎,可以動態選擇序列化方式
protostuff的GitHub地址為:https://github.com/protostuff/protostuff
public class ProtoSerializationUtil {
private static final Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();
private static final Objenesis objenesis = new ObjenesisStd(true);
/**
* 序列化(物件 -> 位元組陣列)
*/
@SuppressWarnings("unchecked")
public static <T> byte[] serialize(T obj) {
Class<T> cls = (Class<T>) obj.getClass();
LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
try {
Schema<T> schema = getSchema(cls);
return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
} finally {
buffer.clear();
}
}
/**
* 反序列化(位元組陣列 -> 物件)
*/
public static <T> T deserialize(byte[] data, Class<T> cls) {
try {
T message = objenesis.newInstance(cls);
Schema<T> schema = getSchema(cls);
ProtostuffIOUtil.mergeFrom(data, message, schema);
return message;
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@SuppressWarnings("unchecked")
private static <T> Schema<T> getSchema(Class<T> cls) {
Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
if (schema == null) {
schema = RuntimeSchema.createFrom(cls);
cachedSchema.put(cls, schema);
}
return schema;
}
}
五 RpcEncoder和RpcDecoder
RpcEncoder編碼器
編碼器負責將Java物件序列化成位元組陣列,再封裝在Netty 的ByteBuf裡面
public class RpcEncoder extends MessageToByteEncoder {
private Class<?> genericClass;
public RpcEncoder(Class<?> genericClass){
this.genericClass=genericClass;
}
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
if(genericClass.isInstance(msg)){
byte[] data= ProtoSerializationUtil.serialize(msg);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
RpcDecoder
傳送的時候比較簡單,因為byteBuf是動態擴充套件的。但接收的時候就需要考慮半包、粘包的問題了。這裡只是用了(in.readableBytes()<4)來簡單處理,但如果傳輸的物件比較大,就應該考慮加其他的Decoder來解決。這裡暫未擴充套件。
public class RpcDecoder extends ByteToMessageDecoder {
private Class<?> genericClass;
public RpcDecoder(Class<?> genericClass){
this.genericClass=genericClass;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes()<4){
return;
}
in.markReaderIndex();
int dataLength=in.readInt();
if(in.readableBytes()<dataLength){
in.resetReaderIndex();
return;
}
byte[] data=new byte[dataLength];
in.readBytes(data);
out.add(ProtoSerializationUtil.deserialize(data,genericClass));
}
}