SE為旗下90年代遊戲《Gex》在日本註冊商標 IP重啟在即
阿新 • • 發佈:2022-02-25
先附上GitHub地址:https://github.com/kosamino/netty-root/tree/master/ym-netty-rpc
RPC的實現方式是本地通過遠端代理物件呼叫遠端服務。在網際網路應用場景中,單體服務極度缺乏彈性伸縮能力,在大規模開發團隊中也不便於開發管理。所以往往會把服務根據模組進行垂直拆分,也就是我們說的SOA服務化。服務拆分後系統跟系統直接的業務互動往往依賴於RPC框架進行通訊。
通常RPC的服務端會提供對應的介面jar包,客戶端通過rpc框架功能拿到對應介面的代理例項,整個呼叫過程資料的包裝和通訊都是透明的。
呼叫流程
首先先來分析下RPC流程是怎樣的,如下圖:
我們包含三部分,使用者、Netty客戶端,Netty服務端:
- 使用者發起呼叫;
- Netty客戶端包裝請求;
- 客戶端對請求進行序列化(物件轉ByteBuf);
- 序列化後傳送訊息到服務端;
- 服務端會對請求進行反序列化解碼成具體物件;
- 服務端根據客戶端傳送的請求解析並準備返回結果;
- 服務端對返回結果序列化為ByteBuf;
- 客戶端收到返回資訊;
- 客戶端對返回資訊反列化得到Object資訊;
- 客戶端把結果返回給使用者呼叫方,完成整個請求。
包含技術
如上所示,就是整個RPC框架的簡單流程,在這個流程中需要使用哪些技術呢?
- 動態代理:通過java Proxy技術拿到代理物件,invocationHandler實現資料協議包裝和通訊。
- 序列化、反序列化
- 網路通訊:基於netty的客戶端和服務端進行通訊可以獲得很好的IO效能
- 反射:根據客戶端請求引數通過反射技術實現服務端對應例項的方法呼叫
接下來我們就部分技術的使用進行程式碼片段分析。
動態代理
//todo 代理物件 QueryStudentClient client = (QueryStudentClient)rpcProxyFactory.factoryRemoteInvoker("localhost",8080,QueryStudentClient.class); public class RpcProxyFactory<T> { public T factoryRemoteInvoker(String host, int port, Class interfaces){ //動態代理 return (T) Proxy.newProxyInstance(interfaces.getClassLoader(),new Class[]{interfaces}, new RemoteInvocationHandler(host,port,interfaces)); } } public class RemoteInvocationHandler implements InvocationHandler { private String host; private int port; private Class interfaces; public RemoteInvocationHandler(String host, int port, Class interfaces) { this.host = host; this.port = port; this.interfaces = interfaces; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //todo 封裝訊息 RpcContext rpcContext=new RpcContext(); rpcContext.setClassName(interfaces.getName()); rpcContext.setMethodName(method.getName()); rpcContext.setTypes(method.getParameterTypes()); rpcContext.setParams(args); try { //通訊 NettyClient client=new NettyClient(host,port); client.connect(); return client.sendData(rpcContext); }catch (Exception e){ } return null; } }
序列化、反序列化
@Override
protected void initChannel(SocketChannel sc) throws Exception {
handler = new NettyClientHandler(latch);
HessianEncode hessionEncodeHandler=new HessianEncode();
HessianDecode hessionDecodeHandler= new HessianDecode();
LengthFieldPrepender fieldEncoder=new LengthFieldPrepender(2);
// LengthFieldBasedFrameDecoder fieldDecoder = new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2);
// 出站
sc.pipeline().addLast(fieldEncoder);
sc.pipeline().addLast(hessionEncodeHandler);
//入站 LengthFieldBasedFrameDecoder多執行緒下不安全,因此使用new
sc.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
sc.pipeline().addLast(hessionDecodeHandler);
sc.pipeline().addLast(handler);
}
可以看到在pipeline先後添加了:基於訊息頭的長度設定的粘包半包處理handler、序列化工具、反序列化工具,此處序列化使用的是Hessian。
反射技術
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
RpcContext model=(RpcContext)msg;
Class clazz=null;
if(Registry.map.containsKey(model.getClassName())){
clazz=Registry.map.get(model.getClassName());
}
Object result=null;
try {
Method method=clazz.getMethod(model.getMethodName(),model.getTypes());
result=method.invoke(clazz.newInstance(),model.getParams());
}catch (Exception e){
e.printStackTrace();
}
ctx.channel().writeAndFlush(result);
}
可以看到服務端根據客戶端傳來的類名,去Registry的map中獲取已註冊的類,然後根據返回型別、方法名、引數進行反射呼叫。
Netty非同步呼叫執行緒協作問題
使用netty實現客戶端傳送需要注意的點: