Seata原始碼解析——RPC模組底層實現
前言
Seata是一個分散式事務解決方案框架,既然TC和TM、RM是分散式部署的,那麼Seata必定涉及到了網路通訊。Seata底層就是用了Netty,並涉及了自有的RPC協議進行三者的通訊的,本文就從Netty入手,來揭開Seata RPC模組神祕的面紗。
*本文適合瞭解Netty,rpc的小白閱讀。
總覽
Seata的RPC模組位於seata.core模組中:
在netty包中,封裝了seata對netty使用,類圖如下:
*這張圖是按照 Seata RPC 模組的重構之路畫的(作者是seata的貢獻者)
從圖中看到,伺服器(TC),和客戶端(TM,RM)的類還是比較對稱的,Server端的比較簡單,我們以client為例,看看它的傳送一個RPC請求的流程:
AbstractNettyRemotingClient——一個RPC請求方法
這個類主要定義了客戶端傳送同步或非同步rpc請求的方法:
我們來看下sendSyncRequest()方法:重點就是看看seata用netty傳送了什麼(RpcMessage),不需要太深入瞭解它的傳送邏輯。
//省略了部分程式碼 @Override public Object sendSyncRequest(Object msg) throws TimeoutException { // 1.根據負載均衡演算法獲取伺服器節點的ip和port地址。 String serverAddress = loadBalance(getTransactionServiceGroup(), msg); //2.獲取配置的rpc請求超時時間。 long timeoutMillis = this.getRpcRequestTimeout(); //3.建立rpc請求訊息體 RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); //4.判斷是否開啟了批量請求,如果開啟了則走批量請求邏輯 // send batch message // put message into basketMap, @see MergedSendRunnable if (this.isEnableClientBatchSendRequest()) { // send batch message is sync request, needs to create messageFuture and put it in futures. MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeoutMillis); //5.把請求的MessageFuture物件放入future這個concurrenthashmap中,用於rpc返回結果後非同步回撥MessageFuture物件 futures.put(rpcMessage.getId(), messageFuture); // put message into basketMap // 把請求放入阻塞佇列裡,MergedSendRunnable執行緒會讀取阻塞佇列的請求並作合併傳送。 BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress, key -> new LinkedBlockingQueue<>()); if (!basket.offer(rpcMessage)) { LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}", serverAddress, rpcMessage); return null; } if (!isSending) { synchronized (mergeLock) { mergeLock.notifyAll(); } } } else { // 非批處理模式,呼叫netty的程式碼,這裡的clientChannelManager應該是seata對channel做了池化 Channel channel = clientChannelManager.acquireChannel(serverAddress); return super.sendSync(channel, rpcMessage, timeoutMillis); } }
RpcMessage——RPC協議
上面我們看到,Seata客戶端傳送的MessageFuture
裡面包含了RpcMessage,它是封裝用來RPC協議的。
先看RPC協議的設計:
0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 +-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+ | magic |Proto| Full length | Head | Msg |Seria|Compr| RequestId | | code |colVer| (head+body) | Length |Type |lizer|ess | | +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ | | | Head Map [Optional] | +-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+ | | | body | | | | ... ... | +-----------------------------------------------------------------------------------------------+
Len | Param | Desc | Desc in chinese |
---|---|---|---|
2B | Magic Code | 0xdada | 魔術位 |
1B | ProtocolVersion | 1 | 協議版本:用於非相容性升級 |
4B | FullLength | include front 3 bytes and self 4 bytes | 總長度 :用於拆包,包括前3位和自己4位 |
2B | HeadLength | include front 7 bytes, self 4 bytes, and head map | 頭部長度:包括前面7位,自己4位,以及 HeadMap |
1B | Message type | request(oneway/twoway)/response/heartbeat/callback | 訊息型別:請求(單向/雙向)/響應/心跳/回撥/go away等 |
1B | Serialization | custom, hessian, pb | 序列化型別:內建/hessian/protobuf等 |
1B | CompressType | None/gzip/snappy... | 壓縮演算法:無/gzip/snappy |
4B | MessageId | Integer | 訊息 Id |
2B | TypeCode | code in AbstractMessage | 訊息型別: AbstractMessage 裡的型別 |
?B | HeadMap[Optional] | exists when if head length > 16 | 訊息Map(可選的,如果頭部長度大於16,代表存在HeadMap) |
ATTR_KEY(?B) key:string:length(2B)+data ATTR_TYPE(1B) 1:int; 2:string; 3:byte; 4:short ATTR_VAL(?B) int:(4B); string:length(2B)+data; byte:(1B); short:(2B) } | Key: 字串 Value 型別 Value 值 | ||
?b | Body | (FullLength-HeadLength) |
可以看到Seata的RPC協議頭如果不包含HeadMap(可拓展)的話總共是18B,總體來說還是比較小的。Seata rpc協議設計有一個亮點,那就是支援協議頭拓展(HeadLength、HeadMap[Optional]),他這種設計既靈活效能又高(一些透傳引數可以不用放在Body裡面,這樣就不需要對body進行編解碼直接在協議頭部獲取)
編碼&解碼
Seata中有編碼器ProtocolV1Encoder和解碼器ProtocolV1Decoder,分別繼承了MessageToByteEncoder
和LengthFieldBasedFrameDecoder
簡單瞭解一下這兩個netty提供的類:
MessageToByteEncoder:抽象類,繼承了ChannelOutboundHandlerAdapter
。用於將訊息轉換成位元組,需要實現的只有一個方法void encode(ChannelHandlerContext var1, I var2, ByteBuf var3) throws Exception;
它被呼叫時將會傳入要被該類編碼為 ByteBuf 的(型別為 I 的)出站訊息。該 ByteBuf 隨後將會被轉發給 ChannelPipeline中的下一個 ChannelOutboundHandler
LengthFieldBasedFrameDecoder:繼承自ByteToMessageDecoder
,因為Seata RPC是基於長度的協議,Netty提供了此類對這種協議進行解碼,並且提供了幾種建構函式來支援各種各樣的頭部配置情況。
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip)
//maxFrameLength: 最大幀長度,超過此長度的資料會被丟棄
//lengthFieldOffset:長度域偏移。資料開始的幾個位元組可能不是表示資料長度(這裡指魔數和版本號),需要後移幾個位元組才是長度域。 根據上面的定義:這裡是3B
//lengthFieldLength:長度域位元組數。用幾個位元組來表示資料長度。 這裡是4B
//lengthAdjustment:資料長度修正。因為長度域指定的長度可以使header+body的整個長度,也可以只是body的長度。如果表示header+body的整個長度,需要修正資料長度。
//initialBytesToStrip:跳過的位元組數。如果你需要接收header+body的所有資料,此值就是0,如果你只想接收body資料,那麼需要跳過header所佔用的位元組數。
接下來我們看解碼器的解碼方法,它的作用是根據自定義的RPC協議將ByteBuf位元組轉換成RpcMessage
ProtocolV1Decoder#decodeFrame
public Object decodeFrame(ByteBuf frame) {
//1.校驗魔數
byte b0 = frame.readByte();
byte b1 = frame.readByte();
if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
|| ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
}
byte version = frame.readByte(); //版本號
// TODO check version compatible here
int fullLength = frame.readInt();
short headLength = frame.readShort();
byte messageType = frame.readByte();
byte codecType = frame.readByte();
byte compressorType = frame.readByte();
int requestId = frame.readInt();
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(codecType); //訊息型別
rpcMessage.setId(requestId); // 訊息ID
rpcMessage.setCompressor(compressorType); // 壓縮演算法
rpcMessage.setMessageType(messageType); //報文型別
// direct read head with zero-copy 如果有拓展請求頭
int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;
if (headMapLength > 0) {
Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);
rpcMessage.getHeadMap().putAll(map);
}
// read body
if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {
rpcMessage.setBody(HeartbeatMessage.PING);
} else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
rpcMessage.setBody(HeartbeatMessage.PONG);
} else {
int bodyLength = fullLength - headLength;
if (bodyLength > 0) {
byte[] bs = new byte[bodyLength];
frame.readBytes(bs);
Compressor compressor = CompressorFactory.getCompressor(compressorType);
bs = compressor.decompress(bs); //解壓縮
Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
rpcMessage.setBody(serializer.deserialize(bs)); //反序列化
}
}
return rpcMessage;
}
編碼部分和解碼的差不多,就不分析了、