1. 程式人生 > 其它 >Netty學習之實戰RPC框架

Netty學習之實戰RPC框架

RPC的實現方式是本地通過遠端代理物件呼叫遠端服務。在網際網路應用場景中,單體服務極度缺乏彈性伸縮能力,在大規模開發團隊中也不便於開發管理。所以往往會把服務根據模組進行垂直拆分,也就是我們說的SOA服務化。服務拆分後系統跟系統直接的業務互動往往依賴於RPC框架進行通訊。

  通常RPC的服務端會提供對應的介面jar包,客戶端通過rpc框架功能拿到對應介面的代理例項,整個呼叫過程資料的包裝和通訊都是透明的。

一、呼叫流程

  首先先來分析下RPC流程是怎樣的,如下圖:

      

  我們包含三部分,使用者、Netty客戶端,Netty服務端:

  1. 使用者發起呼叫;
  2. Netty客戶端包裝請求;
  3. 客戶端對請求進行序列化(物件轉ByteBuf);
  4. 序列化後傳送訊息到服務端;
  5. 服務端會對請求進行反序列化解碼成具體物件;
  6. 服務端根據客戶端傳送的請求解析並準備返回結果;
  7. 服務端對返回結果序列化為ByteBuf;
  8. 客戶端收到返回資訊;
  9. 客戶端對返回資訊反列化得到Object資訊;
  10. 客戶端把結果返回給使用者呼叫方,完成整個請求。

二、包含技術

  如上所示,就是整個RPC框架的簡單流程,在這個流程中需要使用哪些技術呢?

  • 動態代理:通過java Proxy技術拿到代理物件,invocationHandler實現資料協議包裝和通訊。
  • 序列化、反序列化
  • 網路通訊:基於netty的客戶端和服務端進行通訊可以獲得很好的IO效能
  • 反射:根據客戶端請求引數通過反射技術實現服務端對應例項的方法呼叫

  接下來我們就部分技術的使用進行程式碼片段分析。

  1、動態代理

//todo 代理物件
QueryStudentClient client = (QueryStudentClient)rpcProxyFactory.factoryRemoteInvoker("localhost",8080,QueryStudentClient.class);

public class RpcProxyFactory<T> {

    public T factoryRemoteInvoker(String host, int port, Class interfaces){
        //動態代理
        return (T) Proxy.newProxyInstance(interfaces.getClassLoader(),new Class[]{interfaces},
                new RemoteInvocationHandler
(host,port,interfaces)); } } public class RemoteInvocationHandler implements InvocationHandler { private String host; private int port; private Class interfaces; public RemoteInvocationHandler(String host, int port, Class interfaces) { this.host = host; this.port = port; this.interfaces = interfaces; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //todo 封裝訊息 RpcContext rpcContext=new RpcContext(); rpcContext.setClassName(interfaces.getName()); rpcContext.setMethodName(method.getName()); rpcContext.setTypes(method.getParameterTypes()); rpcContext.setParams(args); try { //通訊 NettyClient client=new NettyClient(host,port); client.connect(); return client.sendData(rpcContext); }catch (Exception e){ } return null; } }

  2、序列化、反序列化

    @Override
    protected void initChannel(SocketChannel sc) throws Exception {
        handler =  new NettyClientHandler(latch);

        HessianEncode hessionEncodeHandler=new HessianEncode();
        HessianDecode hessionDecodeHandler= new HessianDecode();

        LengthFieldPrepender fieldEncoder=new LengthFieldPrepender(2);
//        LengthFieldBasedFrameDecoder fieldDecoder = new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2);

//        出站
        sc.pipeline().addLast(fieldEncoder);
        sc.pipeline().addLast(hessionEncodeHandler);

        //入站        LengthFieldBasedFrameDecoder多執行緒下不安全,因此使用new
        sc.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));
        sc.pipeline().addLast(hessionDecodeHandler);
        sc.pipeline().addLast(handler);
    }

  可以看到在pipeline先後添加了:基於訊息頭的長度設定的粘包半包處理handler、序列化工具、反序列化工具,此處序列化使用的是Hessian。

  3、反射技術

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        RpcContext model=(RpcContext)msg;

        Class clazz=null;
        if(Registry.map.containsKey(model.getClassName())){
            clazz=Registry.map.get(model.getClassName());
        }

        Object result=null;
        try {
            Method method=clazz.getMethod(model.getMethodName(),model.getTypes());
            result=method.invoke(clazz.newInstance(),model.getParams());
        }catch (Exception e){
            e.printStackTrace();
        }

        ctx.channel().writeAndFlush(result);
    }

  可以看到服務端根據客戶端傳來的類名,去Registry的map中獲取已註冊的類,然後根據返回型別、方法名、引數進行反射呼叫。

三、Netty非同步呼叫執行緒協作問題

  使用netty實現客戶端傳送需要注意的點:

  通過Netty的channel呼叫寫資料writeAndFlush 寫的事件以及收到響應之後的channelRead事件都是會非同步執行,所以需要注意執行緒協作的問題。可以使用countdowlacth來實現主執行緒等待channelread執行完之後才去獲取收到的響應物件。

    /**
     * 客戶端傳送資料方法
     * @param rpcRequest
     * @return
     * @throws InterruptedException
     */
    public Object sendData(RpcContext rpcRequest) throws InterruptedException {
        ChannelFuture cf = this.getChannelFuture();//單例模式獲取ChannelFuture物件
        if (cf.channel() != null && cf.channel().isActive()) {
            latch=new CountDownLatch(1);
            clientInitializer.reLatch(latch);
            cf.channel().writeAndFlush(rpcRequest);
            latch.await();
        }

        return clientInitializer.getServerResult();
    }
}

  // 客戶端從服務端讀取資料完成
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);
        result=msg;
        System.out.println("返回資料讀取完畢");
        latch.countDown();
    }

  由此實現了執行緒協作,否則呼叫結果無法得到返回。