1. 程式人生 > >《Dubbo進階一》——RPC協議底層原理

《Dubbo進階一》——RPC協議底層原理

一 RPC協議簡介

在一個典型的RPC的使用場景中,包含了服務發現、負載、容錯、序列化和網路傳輸等元件,其中RPC協議指明瞭程式如何進行序列化和網路傳輸,也就是說一個RPC協議的實現等於一個非透明的RPC呼叫。
在這裡插入圖片描述
簡單來說,分散式框架的核心是RPC框架,RPC框架的核心是RPC協議

dubbo 支援的RPC協議列表

名稱 實現描述 連線描述 使用場景
dubbo 傳輸服務: mina, netty(預設), grizzy; 序列化: dubbo, hessian2(預設), java, fastjson。 自定義報文 單個長連線NIO;非同步傳輸 1.常規RPC呼叫 2.傳輸資料量小 3.提供者少於消費者
rmi 傳輸:java rmi 服務; 序列化:java原生二進位制序列化 多個短連線; BIO同步傳輸 1.常規RPC呼叫 2.與原RMI客戶端整合 3.可傳少量檔案 4.不支援防火牆穿透
hessian 傳輸服務:servlet容器; 序列化:hessian二進位制序列化 基於Http 協議傳輸,依懶servlet容器配置 1.提供者多於消費者 2.可傳大欄位和檔案 3.跨語言呼叫
http 傳輸服務:servlet容器; 序列化:http表單 依懶servlet容器配置 1、資料包大小混合
thrift 與thrift RPC 實現整合,並在其基礎上修改了報文頭 長連線、NIO非同步傳輸

(PS:本文只探討dubbo協議)

二 協議的基本組成

在這裡插入圖片描述

  1. IP:服務提供者的地址
  2. 埠:協議指定開放埠
  3. 執行服務
    (1)netty
    (2)mima
    (3)rmi
    (4)servlet容器(Jetty、Tomcat、Jboss)
  4. 協議報文編碼
  5. 序列化方式
    (1)Hessian2Serialization
    (2)DubboSerialization
    (3)JavaSerialization
    (4)JsonSerialization

三 Duboo的RPC協議報文

先看下http協議報文格式
在這裡插入圖片描述
在這裡插入圖片描述
同樣,Dubbo也有自己的報文格式
在這裡插入圖片描述
以head+request body或head+response body的形式存在

  • head
    1標誌位:表明是請求還是響應還是事件
    2status:表明狀態是OK還是不OK
  • request body
    1Dubbo版本號
    2介面路徑
    3介面版本
    4方法名稱
    5引數型別
    6引數值
  • response body
    1結果標誌(無結果、有結果、異常)
    2結果

協議的編解碼過程:
在這裡插入圖片描述

四 原始碼探究

以明晰編碼解碼和序列化反序列化為目的探究原始碼。其實就是如上圖所示的協議的編解碼過程。

com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec是很重要的一個類,無論是request還是response,還有編碼解碼都在這裡類進行排程。

DubboCodec:
在這裡插入圖片描述
其中重點關注三個方法
decodeBody():解碼(請求或響應)以及序列化和反序列化
encodeRequestData():編碼請求(發生在Consumer)
encodeResponseData():編碼響應(發生在Provider)

1.編碼序列化request

發生在Consumer發請求之前
encodeRequestData()

protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
        RpcInvocation inv = (RpcInvocation)data;
        out.writeUTF(inv.getAttachment("dubbo", DUBBO_VERSION));
        out.writeUTF(inv.getAttachment("path"));
        out.writeUTF(inv.getAttachment("version"));
        out.writeUTF(inv.getMethodName());
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null) {
            for(int i = 0; i < args.length; ++i) {
                out.writeObject(CallbackServiceCodec.encodeInvocationArgument(channel, inv, i));
            }
        }

        out.writeObject(inv.getAttachments());
    }

引數ObjectOutput是序列化介面,具體呼叫什麼實現類有配置決定,如沒有則預設是hessian2。能用的子類(序列化方式)如下
在這裡插入圖片描述

RpcInvocation拿到datadata是請求的基本內容,也就是第三部分所說的request body的六個模組:Dubbo版本號、介面路徑、介面版本、方法名稱、引數型別、引數值。
writeUTF()將版本號、介面路徑、介面版本、方法名和引數稱寫進序列化類。
最後的writeObject() 通過配置的序列化方式呼叫相應的實現類進行序列化,如在protocol配置了serialization=“fastjson”,將呼叫FastJsonObjectOutput實現類的writeObject()
在這裡插入圖片描述
編碼序列化request完成

2.編碼序列化response

發生在Provider發出響應之前。
encodeResponseData

protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
        Result result = (Result)data;
        Throwable th = result.getException();
        if (th == null) {
            Object ret = result.getValue();
            if (ret == null) {
                out.writeByte((byte)2);
            } else {
                out.writeByte((byte)1);
                out.writeObject(ret);
            }
        } else {
            out.writeByte((byte)0);
            out.writeObject(th);
        }

    }

過程與編碼序列化request類似且較為簡單,不再多說。

3.解碼反序列化request和response

解碼反序列化request發生在Provider;解碼反序列化response發生在Consumer。兩個方法在同個方法中,就一起講了。

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
       byte flag = header[2];
       byte proto = (byte)(flag & 31);
       Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
       long id = Bytes.bytes2long(header, 4);
       if ((flag & -128) == 0) {
           Response res = new Response(id);
           if ((flag & 32) != 0) {
               res.setEvent(Response.HEARTBEAT_EVENT);
           }

           byte status = header[3];
           res.setStatus(status);
           if (status == 20) {
               try {
                   Object data;
                   if (res.isHeartbeat()) {
                       data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
                   } else if (res.isEvent()) {
                       data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
                   } else {
                       DecodeableRpcResult result;
                       if (channel.getUrl().getParameter("decode.in.io", true)) {
                           result = new DecodeableRpcResult(channel, res, is, (Invocation)this.getRequestData(id), proto);
                           result.decode();
                       } else {
                           result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(this.readMessageData(is)), (Invocation)this.getRequestData(id), proto);
                       }

                       data = result;
                   }

                   res.setResult(data);
               } catch (Throwable var13) {
                   if (log.isWarnEnabled()) {
                       log.warn("Decode response failed: " + var13.getMessage(), var13);
                   }

                   res.setStatus((byte)90);
                   res.setErrorMessage(StringUtils.toString(var13));
               }
           } else {
               res.setErrorMessage(this.deserialize(s, channel.getUrl(), is).readUTF());
           }

           return res;
       } else {
           Request req = new Request(id);
           req.setVersion("2.0.0");
           req.setTwoWay((flag & 64) != 0);
           if ((flag & 32) != 0) {
               req.setEvent(Request.HEARTBEAT_EVENT);
           }

           try {
               Object data;
               if (req.isHeartbeat()) {
                   data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
               } else if (req.isEvent()) {
                   data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
               } else {
                   DecodeableRpcInvocation inv;
                   if (channel.getUrl().getParameter("decode.in.io", true)) {
                       inv = new DecodeableRpcInvocation(channel, req, is, proto);
                       inv.decode();
                   } else {
                       inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(this.readMessageData(is)), proto);
                   }

                   data = inv;
               }

               req.setData(data);
           } catch (Throwable var14) {
               if (log.isWarnEnabled()) {
                   log.warn("Decode request failed: " + var14.getMessage(), var14);
               }

               req.setBroken(true);
               req.setData(var14);
           }

           return req;
       }
   }

需要注意的是來到這個方法表明請求頭已經處理好,現在是處理body。
flag通過header拿到標誌位。
第一個if語句(flag & -128) == 0,實際上是在判斷是request還是response,若為true為response,也就是Consumer要解碼反序列化從Provider發來的響應;若為false為request,也就是Provider要解碼反序列化從Consumer發來的請求。

(1)解碼反序列化request

(flag & -128) == 0為false時,進入else執行體,在服務端進行操作。
if ((flag & 32) != 0)在判斷是否時一個心跳事件,心跳事件時為了檢測連線是否斷開以備重連。
if (req.isHeartbeat())判斷是否時一個心跳事件,else if (req.isEvent())判斷是否時一個事件
排除了這兩個之後就是真正的request。
inv拿到request相關引數,inv.decode()進行解碼和反序列化。
呼叫DecodeableRpcInvocationdecode()方法如下

public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
        this.setAttachment("dubbo", in.readUTF());
        this.setAttachment("path", in.readUTF());
        this.setAttachment("version", in.readUTF());
        this.setMethodName(in.readUTF());

        try {
            String desc = in.readUTF();
            Object[] args;
            Class[] pts;
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];

                for(int i = 0; i < args.length; ++i) {
                    try {
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception var9) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + var9.getMessage(), var9);
                        }
                    }
                }
            }

            this.setParameterTypes(pts);
            Map<String, String> map = (Map)in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = this.getAttachments();
                if (attachment == null) {
                    attachment = new HashMap();
                }

                ((Map)attachment).putAll(map);
                this.setAttachments((Map)attachment);
            }

            for(int i = 0; i < args.length; ++i) {
                args[i] = CallbackServiceCodec.decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            this.setArguments(args);
            return this;
        } catch (ClassNotFoundException var10) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", var10));
        }
    }

其中ObjectInput選擇的序列化方式實現子類依然時根據配置檔案來的,只有與客戶端序列化的方式一樣才能反序列化成功。接下來是逐個readUTF()解碼request body的模組。try程式碼塊裡的readUTF()解碼出引數型別和引數值。最後將dubbo的隱式引數也一同設定進去Map<String, String> map = (Map)in.readObject(Map.class),到這裡DecodeableRpcInvocation拿到所有相關引數,後續可以進行業務操作。
解碼反序列化request完成

(2)解碼反序列化response

(flag & -128) == 0為true時,進入if執行體,在客戶端進行操作。
if ((flag & 32) != 0)在判斷是否時一個心跳事件,心跳事件時為了檢測連線是否斷開以備重連。
status從header拿到狀態碼,如果不等於20,直接進入else執行錯誤資訊寫入到responseres.setErrorMessage()
if (req.isHeartbeat()判斷是否時一個心跳事件,else if (req.isEvent()判斷是否時一個事件
排除了這兩個之後就是真正的response。
result拿到response相關引數,result .decode()進行解碼和反序列化。
呼叫DecodeableRpcResultdecode()方法如下

public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
        byte flag = in.readByte();
        switch(flag) {
        case 0:
            try {
                Object obj = in.readObject();
                if (!(obj instanceof Throwable)) {
                    throw new IOException("Response data error, expect Throwable, but get " + obj);
                }

                this.setException((Throwable)obj);
                break;
            } catch (ClassNotFoundException var6) {
                throw new IOException(StringUtils.toString("Read response data failed.", var6));
            }
        case 1:
            try {
                Type[] returnType = RpcUtils.getReturnTypes(this.invocation);
                this.setValue(returnType != null && returnType.length != 0 ? (returnType.length == 1 ? in.readObject((Class)returnType[0]) : in.readObject((Class)returnType[0], returnType[1])) : in.readObject());
            } catch (ClassNotFoundException var7) {
                throw new IOException(StringUtils.toString("Read response data failed.", var7));
            }
        case 2:
            break;
        default:
            throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
        }

        return this;
    }

一開始就呼叫getSerialization()進行反序列化,然後賦給ObjectInput。
判斷flag,0為發生異常,並處理異常資訊;2為沒值,直接退出方法。
當等於1時對response進行解碼,呼叫setValue()將資訊讀出來。
解碼反序列化response完成

4.業務呼叫

在這裡插入圖片描述
瞭解是如何編碼序列化等操作之後,最後看下服務端接收到請求整個流程是如何呼叫的。(客戶端接收到響應類似)
在這裡插入圖片描述
以dubbo預設的傳輸服務netty為例,存在一個重要的類:
com\alibaba\dubbo\remoting\transport\netty\NettyServer.class
(客戶端為NettyClient)
在這裡插入圖片描述
其中的doOpen()方法,表示開啟服務

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        this.bootstrap = new ClientBootstrap(channelFactory);
        this.bootstrap.setOption("keepAlive", true);
        this.bootstrap.setOption("tcpNoDelay", true);
        this.bootstrap.setOption("connectTimeoutMillis", this.getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(this.getUrl(), this);
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(NettyClient.this.getCodec(), NettyClient.this.getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

三個pipeline.addLast()操作對應解碼、編碼以及解碼後的操作。編解碼上面已經說過,這裡主要探究解碼後的操作。

解碼完成後帶著引數發起對AllDispatcher類的呼叫

public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";

    public AllDispatcher() {
    }

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}

可以看到它又呼叫了ChannelHandler介面來處理,最終是返回呼叫AllChannelHandler實現類。
在這裡插入圖片描述
其中在received()方法中進行執行緒派發

public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = this.getExecutorService();

        try {
            cexecutor.execute(new ChannelEventRunnable(channel, this.handler, ChannelState.RECEIVED, message));
        } catch (Throwable var8) {
            if (message instanceof Request && var8 instanceof RejectedExecutionException) {
                Request request = (Request)message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + this.url.getIp() + "," + this.url.getPort() + ") threadpool is exhausted ,detail msg:" + var8.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus((byte)100);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }

            throw new ExecutionException(message, channel, this.getClass() + " error when process received event .", var8);
        }
    }

傳進來的引數Object message包含request。
ExecutorService cexecutor拿到對應的執行緒池。
呼叫cexecutor.execute()執行,執行時呼叫了ChannelEventRunnable,在ChannelEventRunnable這個類的run()方法就呼叫了我們自己寫的業務方法。