Netty學習之實戰RPC框架
阿新 • • 發佈:2021-07-31
RPC的實現方式是本地通過遠端代理物件呼叫遠端服務。在網際網路應用場景中,單體服務極度缺乏彈性伸縮能力,在大規模開發團隊中也不便於開發管理。所以往往會把服務根據模組進行垂直拆分,也就是我們說的SOA服務化。服務拆分後系統跟系統直接的業務互動往往依賴於RPC框架進行通訊。
通常RPC的服務端會提供對應的介面jar包,客戶端通過rpc框架功能拿到對應介面的代理例項,整個呼叫過程資料的包裝和通訊都是透明的。
一、呼叫流程
首先先來分析下RPC流程是怎樣的,如下圖:
我們包含三部分,使用者、Netty客戶端,Netty服務端:
- 使用者發起呼叫;
- Netty客戶端包裝請求;
- 客戶端對請求進行序列化(物件轉ByteBuf);
- 序列化後傳送訊息到服務端;
- 服務端會對請求進行反序列化解碼成具體物件;
- 服務端根據客戶端傳送的請求解析並準備返回結果;
- 服務端對返回結果序列化為ByteBuf;
- 客戶端收到返回資訊;
- 客戶端對返回資訊反列化得到Object資訊;
- 客戶端把結果返回給使用者呼叫方,完成整個請求。
二、包含技術
如上所示,就是整個RPC框架的簡單流程,在這個流程中需要使用哪些技術呢?
- 動態代理:通過java Proxy技術拿到代理物件,invocationHandler實現資料協議包裝和通訊。
- 序列化、反序列化
- 網路通訊:基於netty的客戶端和服務端進行通訊可以獲得很好的IO效能
- 反射:根據客戶端請求引數通過反射技術實現服務端對應例項的方法呼叫
接下來我們就部分技術的使用進行程式碼片段分析。
1、動態代理
//todo 代理物件 QueryStudentClient client = (QueryStudentClient)rpcProxyFactory.factoryRemoteInvoker("localhost",8080,QueryStudentClient.class); public class RpcProxyFactory<T> { public T factoryRemoteInvoker(String host, int port, Class interfaces){ //動態代理 return (T) Proxy.newProxyInstance(interfaces.getClassLoader(),new Class[]{interfaces}, new RemoteInvocationHandler(host,port,interfaces)); } } public class RemoteInvocationHandler implements InvocationHandler { private String host; private int port; private Class interfaces; public RemoteInvocationHandler(String host, int port, Class interfaces) { this.host = host; this.port = port; this.interfaces = interfaces; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //todo 封裝訊息 RpcContext rpcContext=new RpcContext(); rpcContext.setClassName(interfaces.getName()); rpcContext.setMethodName(method.getName()); rpcContext.setTypes(method.getParameterTypes()); rpcContext.setParams(args); try { //通訊 NettyClient client=new NettyClient(host,port); client.connect(); return client.sendData(rpcContext); }catch (Exception e){ } return null; } }
2、序列化、反序列化
@Override protected void initChannel(SocketChannel sc) throws Exception { handler = new NettyClientHandler(latch); HessianEncode hessionEncodeHandler=new HessianEncode(); HessianDecode hessionDecodeHandler= new HessianDecode(); LengthFieldPrepender fieldEncoder=new LengthFieldPrepender(2); // LengthFieldBasedFrameDecoder fieldDecoder = new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2); // 出站 sc.pipeline().addLast(fieldEncoder); sc.pipeline().addLast(hessionEncodeHandler); //入站 LengthFieldBasedFrameDecoder多執行緒下不安全,因此使用new sc.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2)); sc.pipeline().addLast(hessionDecodeHandler); sc.pipeline().addLast(handler); }
可以看到在pipeline先後添加了:基於訊息頭的長度設定的粘包半包處理handler、序列化工具、反序列化工具,此處序列化使用的是Hessian。
3、反射技術
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RpcContext model=(RpcContext)msg; Class clazz=null; if(Registry.map.containsKey(model.getClassName())){ clazz=Registry.map.get(model.getClassName()); } Object result=null; try { Method method=clazz.getMethod(model.getMethodName(),model.getTypes()); result=method.invoke(clazz.newInstance(),model.getParams()); }catch (Exception e){ e.printStackTrace(); } ctx.channel().writeAndFlush(result); }
可以看到服務端根據客戶端傳來的類名,去Registry的map中獲取已註冊的類,然後根據返回型別、方法名、引數進行反射呼叫。
三、Netty非同步呼叫執行緒協作問題
使用netty實現客戶端傳送需要注意的點:
通過Netty的channel呼叫寫資料writeAndFlush 寫的事件以及收到響應之後的channelRead事件都是會非同步執行,所以需要注意執行緒協作的問題。可以使用countdowlacth來實現主執行緒等待channelread執行完之後才去獲取收到的響應物件。
/** * 客戶端傳送資料方法 * @param rpcRequest * @return * @throws InterruptedException */ public Object sendData(RpcContext rpcRequest) throws InterruptedException { ChannelFuture cf = this.getChannelFuture();//單例模式獲取ChannelFuture物件 if (cf.channel() != null && cf.channel().isActive()) { latch=new CountDownLatch(1); clientInitializer.reLatch(latch); cf.channel().writeAndFlush(rpcRequest); latch.await(); } return clientInitializer.getServerResult(); } } // 客戶端從服務端讀取資料完成 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); result=msg; System.out.println("返回資料讀取完畢"); latch.countDown(); }
由此實現了執行緒協作,否則呼叫結果無法得到返回。