《Dubbo進階一》——RPC協議底層原理
一 RPC協議簡介
在一個典型的RPC的使用場景中,包含了服務發現、負載、容錯、序列化和網路傳輸等元件,其中RPC協議指明瞭程式如何進行序列化和網路傳輸,也就是說一個RPC協議的實現等於一個非透明的RPC呼叫。
簡單來說,分散式框架的核心是RPC框架,RPC框架的核心是RPC協議。
dubbo 支援的RPC協議列表
名稱 | 實現描述 | 連線描述 | 使用場景 |
---|---|---|---|
dubbo | 傳輸服務: mina, netty(預設), grizzy; 序列化: dubbo, hessian2(預設), java, fastjson。 自定義報文 | 單個長連線NIO;非同步傳輸 | 1.常規RPC呼叫 2.傳輸資料量小 3.提供者少於消費者 |
rmi | 傳輸:java rmi 服務; 序列化:java原生二進位制序列化 | 多個短連線; BIO同步傳輸 | 1.常規RPC呼叫 2.與原RMI客戶端整合 3.可傳少量檔案 4.不支援防火牆穿透 |
hessian | 傳輸服務:servlet容器; 序列化:hessian二進位制序列化 | 基於Http 協議傳輸,依懶servlet容器配置 | 1.提供者多於消費者 2.可傳大欄位和檔案 3.跨語言呼叫 |
http | 傳輸服務:servlet容器; 序列化:http表單 | 依懶servlet容器配置 | 1、資料包大小混合 |
thrift | 與thrift RPC 實現整合,並在其基礎上修改了報文頭 | 長連線、NIO非同步傳輸 |
(PS:本文只探討dubbo協議)
二 協議的基本組成
- IP:服務提供者的地址
- 埠:協議指定開放埠
- 執行服務
(1)netty
(2)mima
(3)rmi
(4)servlet容器(Jetty、Tomcat、Jboss) - 協議報文編碼
- 序列化方式
(1)Hessian2Serialization
(2)DubboSerialization
(3)JavaSerialization
(4)JsonSerialization
三 Duboo的RPC協議報文
先看下http協議報文格式
同樣,Dubbo也有自己的報文格式
以head+request body或head+response body的形式存在
- head
1標誌位:表明是請求還是響應還是事件
2status:表明狀態是OK還是不OK - request body
1Dubbo版本號
2介面路徑
3介面版本
4方法名稱
5引數型別
6引數值 - response body
1結果標誌(無結果、有結果、異常)
2結果
協議的編解碼過程:
四 原始碼探究
以明晰編碼解碼和序列化反序列化為目的探究原始碼。其實就是如上圖所示的協議的編解碼過程。
com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec是很重要的一個類,無論是request還是response,還有編碼解碼都在這裡類進行排程。
DubboCodec:
其中重點關注三個方法
decodeBody():解碼(請求或響應)以及序列化和反序列化
encodeRequestData():編碼請求(發生在Consumer)
encodeResponseData():編碼響應(發生在Provider)
1.編碼序列化request
發生在Consumer發請求之前
encodeRequestData()
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
RpcInvocation inv = (RpcInvocation)data;
out.writeUTF(inv.getAttachment("dubbo", DUBBO_VERSION));
out.writeUTF(inv.getAttachment("path"));
out.writeUTF(inv.getAttachment("version"));
out.writeUTF(inv.getMethodName());
out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
Object[] args = inv.getArguments();
if (args != null) {
for(int i = 0; i < args.length; ++i) {
out.writeObject(CallbackServiceCodec.encodeInvocationArgument(channel, inv, i));
}
}
out.writeObject(inv.getAttachments());
}
引數ObjectOutput是序列化介面,具體呼叫什麼實現類有配置決定,如沒有則預設是hessian2。能用的子類(序列化方式)如下
RpcInvocation拿到data
,data
是請求的基本內容,也就是第三部分所說的request body的六個模組:Dubbo版本號、介面路徑、介面版本、方法名稱、引數型別、引數值。
writeUTF()
將版本號、介面路徑、介面版本、方法名和引數稱寫進序列化類。
最後的writeObject()
通過配置的序列化方式呼叫相應的實現類進行序列化,如在protocol配置了serialization=“fastjson”,將呼叫FastJsonObjectOutput實現類的writeObject()
編碼序列化request完成
2.編碼序列化response
發生在Provider發出響應之前。
encodeResponseData
protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
Result result = (Result)data;
Throwable th = result.getException();
if (th == null) {
Object ret = result.getValue();
if (ret == null) {
out.writeByte((byte)2);
} else {
out.writeByte((byte)1);
out.writeObject(ret);
}
} else {
out.writeByte((byte)0);
out.writeObject(th);
}
}
過程與編碼序列化request類似且較為簡單,不再多說。
3.解碼反序列化request和response
解碼反序列化request發生在Provider;解碼反序列化response發生在Consumer。兩個方法在同個方法中,就一起講了。
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2];
byte proto = (byte)(flag & 31);
Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
long id = Bytes.bytes2long(header, 4);
if ((flag & -128) == 0) {
Response res = new Response(id);
if ((flag & 32) != 0) {
res.setEvent(Response.HEARTBEAT_EVENT);
}
byte status = header[3];
res.setStatus(status);
if (status == 20) {
try {
Object data;
if (res.isHeartbeat()) {
data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
} else if (res.isEvent()) {
data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter("decode.in.io", true)) {
result = new DecodeableRpcResult(channel, res, is, (Invocation)this.getRequestData(id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(this.readMessageData(is)), (Invocation)this.getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} catch (Throwable var13) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + var13.getMessage(), var13);
}
res.setStatus((byte)90);
res.setErrorMessage(StringUtils.toString(var13));
}
} else {
res.setErrorMessage(this.deserialize(s, channel.getUrl(), is).readUTF());
}
return res;
} else {
Request req = new Request(id);
req.setVersion("2.0.0");
req.setTwoWay((flag & 64) != 0);
if ((flag & 32) != 0) {
req.setEvent(Request.HEARTBEAT_EVENT);
}
try {
Object data;
if (req.isHeartbeat()) {
data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
} else if (req.isEvent()) {
data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter("decode.in.io", true)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(this.readMessageData(is)), proto);
}
data = inv;
}
req.setData(data);
} catch (Throwable var14) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + var14.getMessage(), var14);
}
req.setBroken(true);
req.setData(var14);
}
return req;
}
}
需要注意的是來到這個方法表明請求頭已經處理好,現在是處理body。
flag通過header拿到標誌位。
第一個if語句(flag & -128) == 0
,實際上是在判斷是request還是response,若為true為response,也就是Consumer要解碼反序列化從Provider發來的響應;若為false為request,也就是Provider要解碼反序列化從Consumer發來的請求。
(1)解碼反序列化request
當(flag & -128) == 0
為false時,進入else執行體,在服務端進行操作。
if ((flag & 32) != 0)
在判斷是否時一個心跳事件,心跳事件時為了檢測連線是否斷開以備重連。
if (req.isHeartbeat())
判斷是否時一個心跳事件,else if (req.isEvent())
判斷是否時一個事件
排除了這兩個之後就是真正的request。
inv
拿到request相關引數,inv.decode()
進行解碼和反序列化。
呼叫DecodeableRpcInvocation的decode()
方法如下
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
this.setAttachment("dubbo", in.readUTF());
this.setAttachment("path", in.readUTF());
this.setAttachment("version", in.readUTF());
this.setMethodName(in.readUTF());
try {
String desc = in.readUTF();
Object[] args;
Class[] pts;
if (desc.length() == 0) {
pts = DubboCodec.EMPTY_CLASS_ARRAY;
args = DubboCodec.EMPTY_OBJECT_ARRAY;
} else {
pts = ReflectUtils.desc2classArray(desc);
args = new Object[pts.length];
for(int i = 0; i < args.length; ++i) {
try {
args[i] = in.readObject(pts[i]);
} catch (Exception var9) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + var9.getMessage(), var9);
}
}
}
}
this.setParameterTypes(pts);
Map<String, String> map = (Map)in.readObject(Map.class);
if (map != null && map.size() > 0) {
Map<String, String> attachment = this.getAttachments();
if (attachment == null) {
attachment = new HashMap();
}
((Map)attachment).putAll(map);
this.setAttachments((Map)attachment);
}
for(int i = 0; i < args.length; ++i) {
args[i] = CallbackServiceCodec.decodeInvocationArgument(channel, this, pts, i, args[i]);
}
this.setArguments(args);
return this;
} catch (ClassNotFoundException var10) {
throw new IOException(StringUtils.toString("Read invocation data failed.", var10));
}
}
其中ObjectInput
選擇的序列化方式實現子類依然時根據配置檔案來的,只有與客戶端序列化的方式一樣才能反序列化成功。接下來是逐個readUTF()解碼request body的模組。try程式碼塊裡的readUTF()解碼出引數型別和引數值。最後將dubbo的隱式引數也一同設定進去Map<String, String> map = (Map)in.readObject(Map.class)
,到這裡DecodeableRpcInvocation拿到所有相關引數,後續可以進行業務操作。
解碼反序列化request完成
(2)解碼反序列化response
當(flag & -128) == 0
為true時,進入if執行體,在客戶端進行操作。
if ((flag & 32) != 0)
在判斷是否時一個心跳事件,心跳事件時為了檢測連線是否斷開以備重連。
status
從header拿到狀態碼,如果不等於20
,直接進入else執行錯誤資訊寫入到responseres.setErrorMessage()
。
if (req.isHeartbeat()
判斷是否時一個心跳事件,else if (req.isEvent()
判斷是否時一個事件
排除了這兩個之後就是真正的response。
result
拿到response相關引數,result .decode()
進行解碼和反序列化。
呼叫DecodeableRpcResult的decode()
方法如下
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
byte flag = in.readByte();
switch(flag) {
case 0:
try {
Object obj = in.readObject();
if (!(obj instanceof Throwable)) {
throw new IOException("Response data error, expect Throwable, but get " + obj);
}
this.setException((Throwable)obj);
break;
} catch (ClassNotFoundException var6) {
throw new IOException(StringUtils.toString("Read response data failed.", var6));
}
case 1:
try {
Type[] returnType = RpcUtils.getReturnTypes(this.invocation);
this.setValue(returnType != null && returnType.length != 0 ? (returnType.length == 1 ? in.readObject((Class)returnType[0]) : in.readObject((Class)returnType[0], returnType[1])) : in.readObject());
} catch (ClassNotFoundException var7) {
throw new IOException(StringUtils.toString("Read response data failed.", var7));
}
case 2:
break;
default:
throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
}
return this;
}
一開始就呼叫getSerialization()
進行反序列化,然後賦給ObjectInput。
判斷flag,0
為發生異常,並處理異常資訊;2
為沒值,直接退出方法。
當等於1
時對response進行解碼,呼叫setValue()
將資訊讀出來。
解碼反序列化response完成
4.業務呼叫
瞭解是如何編碼序列化等操作之後,最後看下服務端接收到請求整個流程是如何呼叫的。(客戶端接收到響應類似)
以dubbo預設的傳輸服務netty為例,存在一個重要的類:
com\alibaba\dubbo\remoting\transport\netty\NettyServer.class
(客戶端為NettyClient)
其中的doOpen()方法,表示開啟服務
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
this.bootstrap = new ClientBootstrap(channelFactory);
this.bootstrap.setOption("keepAlive", true);
this.bootstrap.setOption("tcpNoDelay", true);
this.bootstrap.setOption("connectTimeoutMillis", this.getTimeout());
final NettyHandler nettyHandler = new NettyHandler(this.getUrl(), this);
this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(NettyClient.this.getCodec(), NettyClient.this.getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
三個pipeline.addLast()
操作對應解碼、編碼以及解碼後的操作。編解碼上面已經說過,這裡主要探究解碼後的操作。
解碼完成後帶著引數發起對AllDispatcher
類的呼叫
public class AllDispatcher implements Dispatcher {
public static final String NAME = "all";
public AllDispatcher() {
}
public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new AllChannelHandler(handler, url);
}
}
可以看到它又呼叫了ChannelHandler
介面來處理,最終是返回呼叫AllChannelHandler
實現類。
其中在received()
方法中進行執行緒派發
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = this.getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, this.handler, ChannelState.RECEIVED, message));
} catch (Throwable var8) {
if (message instanceof Request && var8 instanceof RejectedExecutionException) {
Request request = (Request)message;
if (request.isTwoWay()) {
String msg = "Server side(" + this.url.getIp() + "," + this.url.getPort() + ") threadpool is exhausted ,detail msg:" + var8.getMessage();
Response response = new Response(request.getId(), request.getVersion());
response.setStatus((byte)100);
response.setErrorMessage(msg);
channel.send(response);
return;
}
}
throw new ExecutionException(message, channel, this.getClass() + " error when process received event .", var8);
}
}
傳進來的引數Object message
包含request。
ExecutorService cexecutor
拿到對應的執行緒池。
呼叫cexecutor.execute()
執行,執行時呼叫了ChannelEventRunnable
,在ChannelEventRunnable
這個類的run()
方法就呼叫了我們自己寫的業務方法。