1. 程式人生 > 其它 >Seata原始碼解析——RPC模組底層實現

Seata原始碼解析——RPC模組底層實現

目錄

前言

Seata是一個分散式事務解決方案框架,既然TC和TM、RM是分散式部署的,那麼Seata必定涉及到了網路通訊。Seata底層就是用了Netty,並涉及了自有的RPC協議進行三者的通訊的,本文就從Netty入手,來揭開Seata RPC模組神祕的面紗。

*本文適合瞭解Netty,rpc的小白閱讀。

總覽

Seata的RPC模組位於seata.core模組中:

在netty包中,封裝了seata對netty使用,類圖如下:


*這張圖是按照 Seata RPC 模組的重構之路畫的(作者是seata的貢獻者)

從圖中看到,伺服器(TC),和客戶端(TM,RM)的類還是比較對稱的,Server端的比較簡單,我們以client為例,看看它的傳送一個RPC請求的流程:

AbstractNettyRemotingClient——一個RPC請求方法

這個類主要定義了客戶端傳送同步或非同步rpc請求的方法:

我們來看下sendSyncRequest()方法:重點就是看看seata用netty傳送了什麼(RpcMessage),不需要太深入瞭解它的傳送邏輯。

//省略了部分程式碼
  @Override
    public Object sendSyncRequest(Object msg) throws TimeoutException {
	// 1.根據負載均衡演算法獲取伺服器節點的ip和port地址。
        String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
		//2.獲取配置的rpc請求超時時間。
        long timeoutMillis = this.getRpcRequestTimeout();
		//3.建立rpc請求訊息體
        RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);

		//4.判斷是否開啟了批量請求,如果開啟了則走批量請求邏輯
        // send batch message
        // put message into basketMap, @see MergedSendRunnable
        if (this.isEnableClientBatchSendRequest()) {

            // send batch message is sync request, needs to create messageFuture and put it in futures.
            MessageFuture messageFuture = new MessageFuture();
            messageFuture.setRequestMessage(rpcMessage);
            messageFuture.setTimeout(timeoutMillis);
			//5.把請求的MessageFuture物件放入future這個concurrenthashmap中,用於rpc返回結果後非同步回撥MessageFuture物件
            futures.put(rpcMessage.getId(), messageFuture);

            // put message into basketMap
			// 把請求放入阻塞佇列裡,MergedSendRunnable執行緒會讀取阻塞佇列的請求並作合併傳送。
            BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
                key -> new LinkedBlockingQueue<>());
            if (!basket.offer(rpcMessage)) {
                LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
                        serverAddress, rpcMessage);
                return null;
            }
            if (!isSending) {
                synchronized (mergeLock) {
                    mergeLock.notifyAll();
                }
            }
        } else {
		// 非批處理模式,呼叫netty的程式碼,這裡的clientChannelManager應該是seata對channel做了池化
            Channel channel = clientChannelManager.acquireChannel(serverAddress);
            return super.sendSync(channel, rpcMessage, timeoutMillis);
        }

    }

RpcMessage——RPC協議

上面我們看到,Seata客戶端傳送的MessageFuture裡面包含了RpcMessage,它是封裝用來RPC協議的。

先看RPC協議的設計:

0     1     2     3     4     5     6     7     8     9     10    11    12    13    14    15    16
+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|   magic   |Proto|     Full length       |    Head   | Msg |Seria|Compr|     RequestId         |
|   code    |colVer|    (head+body)      |   Length  |Type |lizer|ess  |                       |
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|                                                                                               |
|                                   Head Map [Optional]                                         |
+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|                                                                                               |
|                                         body                                                  |
|                                                                                               |
|                                        ... ...                                                |
+-----------------------------------------------------------------------------------------------+
Len Param Desc Desc in chinese
2B Magic Code 0xdada 魔術位
1B ProtocolVersion 1 協議版本:用於非相容性升級
4B FullLength include front 3 bytes and self 4 bytes 總長度 :用於拆包,包括前3位和自己4位
2B HeadLength include front 7 bytes, self 4 bytes, and head map 頭部長度:包括前面7位,自己4位,以及 HeadMap
1B Message type request(oneway/twoway)/response/heartbeat/callback 訊息型別:請求(單向/雙向)/響應/心跳/回撥/go away等
1B Serialization custom, hessian, pb 序列化型別:內建/hessian/protobuf等
1B CompressType None/gzip/snappy... 壓縮演算法:無/gzip/snappy
4B MessageId Integer 訊息 Id
2B TypeCode code in AbstractMessage 訊息型別: AbstractMessage 裡的型別
?B HeadMap[Optional] exists when if head length > 16 訊息Map(可選的,如果頭部長度大於16,代表存在HeadMap)
ATTR_KEY(?B) key:string:length(2B)+data ATTR_TYPE(1B) 1:int; 2:string; 3:byte; 4:short ATTR_VAL(?B) int:(4B); string:length(2B)+data; byte:(1B); short:(2B) } Key: 字串 Value 型別 Value 值
?b Body (FullLength-HeadLength)

可以看到Seata的RPC協議頭如果不包含HeadMap(可拓展)的話總共是18B,總體來說還是比較小的。Seata rpc協議設計有一個亮點,那就是支援協議頭拓展(HeadLength、HeadMap[Optional]),他這種設計既靈活效能又高(一些透傳引數可以不用放在Body裡面,這樣就不需要對body進行編解碼直接在協議頭部獲取)

編碼&解碼

Seata中有編碼器ProtocolV1Encoder和解碼器ProtocolV1Decoder,分別繼承了MessageToByteEncoderLengthFieldBasedFrameDecoder
簡單瞭解一下這兩個netty提供的類:
MessageToByteEncoder:抽象類,繼承了ChannelOutboundHandlerAdapter。用於將訊息轉換成位元組,需要實現的只有一個方法
void encode(ChannelHandlerContext var1, I var2, ByteBuf var3) throws Exception;
它被呼叫時將會傳入要被該類編碼為 ByteBuf 的(型別為 I 的)出站訊息。該 ByteBuf 隨後將會被轉發給 ChannelPipeline中的下一個 ChannelOutboundHandler

LengthFieldBasedFrameDecoder:繼承自ByteToMessageDecoder,因為Seata RPC是基於長度的協議,Netty提供了此類對這種協議進行解碼,並且提供了幾種建構函式來支援各種各樣的頭部配置情況。

public LengthFieldBasedFrameDecoder(
            int maxFrameLength,
            int lengthFieldOffset, int lengthFieldLength,
            int lengthAdjustment, int initialBytesToStrip)
//maxFrameLength: 最大幀長度,超過此長度的資料會被丟棄
//lengthFieldOffset:長度域偏移。資料開始的幾個位元組可能不是表示資料長度(這裡指魔數和版本號),需要後移幾個位元組才是長度域。 根據上面的定義:這裡是3B 
//lengthFieldLength:長度域位元組數。用幾個位元組來表示資料長度。 這裡是4B
//lengthAdjustment:資料長度修正。因為長度域指定的長度可以使header+body的整個長度,也可以只是body的長度。如果表示header+body的整個長度,需要修正資料長度。
//initialBytesToStrip:跳過的位元組數。如果你需要接收header+body的所有資料,此值就是0,如果你只想接收body資料,那麼需要跳過header所佔用的位元組數。

接下來我們看解碼器的解碼方法,它的作用是根據自定義的RPC協議將ByteBuf位元組轉換成RpcMessage
ProtocolV1Decoder#decodeFrame

public Object decodeFrame(ByteBuf frame) {
		//1.校驗魔數
        byte b0 = frame.readByte();
        byte b1 = frame.readByte();
        if (ProtocolConstants.MAGIC_CODE_BYTES[0] != b0
                || ProtocolConstants.MAGIC_CODE_BYTES[1] != b1) {
            throw new IllegalArgumentException("Unknown magic code: " + b0 + ", " + b1);
        }

        byte version = frame.readByte();   //版本號
        // TODO  check version compatible here
		
        int fullLength = frame.readInt();
        short headLength = frame.readShort();
        byte messageType = frame.readByte();
        byte codecType = frame.readByte();
        byte compressorType = frame.readByte();
        int requestId = frame.readInt();

        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setCodec(codecType);   //訊息型別
        rpcMessage.setId(requestId); // 訊息ID
        rpcMessage.setCompressor(compressorType); // 壓縮演算法
        rpcMessage.setMessageType(messageType); //報文型別

        // direct read head with zero-copy 如果有拓展請求頭
        int headMapLength = headLength - ProtocolConstants.V1_HEAD_LENGTH;
        if (headMapLength > 0) {
            Map<String, String> map = HeadMapSerializer.getInstance().decode(frame, headMapLength);
            rpcMessage.getHeadMap().putAll(map);
        }

        // read body
        if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST) {
            rpcMessage.setBody(HeartbeatMessage.PING);
        } else if (messageType == ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
            rpcMessage.setBody(HeartbeatMessage.PONG);
        } else {
            int bodyLength = fullLength - headLength;
            if (bodyLength > 0) {
                byte[] bs = new byte[bodyLength];
                frame.readBytes(bs);
                Compressor compressor = CompressorFactory.getCompressor(compressorType);
                bs = compressor.decompress(bs); //解壓縮
                Serializer serializer = SerializerServiceLoader.load(SerializerType.getByCode(rpcMessage.getCodec()));
                rpcMessage.setBody(serializer.deserialize(bs));  //反序列化
            }
        }

        return rpcMessage;
    }

編碼部分和解碼的差不多,就不分析了、

參考連結:

seata原始碼解析:RPC模組詳解

Seata解析-seata核心類NettyRemotingServer詳解

Seata RPC 模組的重構之路