程式設計思想:如何設計一個好的通訊網路協議
阿新 • • 發佈:2020-03-30
當網路中兩個程序需要通訊時,我們往往會使用 `Socket` 來實現。`Socket` 都不陌生。當三次握手成功後,客戶端與服務端就能通訊,並且,彼此之間通訊的資料包格式都是二進位制,由` TCP/IP` 協議負責傳輸。
當客戶端和服務端取得了二進位制資料包後,我們往往需要『萃取』出想要的資料,這樣才能更好的執行業務邏輯。所以,我們需要定義好資料結構來描述這些二進位制資料的格式,這就是通訊網路協議。簡單講,就是需要約定好二進位制資料包中每一段位元組的含義,比如從第 n 位元組開始的 m 長度是核心資料,有了這樣的約定後,我們就能解碼出想要的資料,執行業務邏輯,這樣我們就能暢通無阻的通訊了。
### 網路協議的設計
**概要劃分**
一個最基本的網路協議必須包含
- 資料的長度
- 資料
瞭解 `TCP` 協議的同學一定聽說過`粘包、拆包` 這兩個術語。因為`TCP`協議是資料流協議,它的底層根據二進位制緩衝區的實際情況進行包的劃分。所以,不可避免的會出現`粘包,拆包` 現象 。為了解決它們,我們的網路協議往往會使用一個 4 位元組的 `int` 型別來表示資料的大小。比如,`Netty` 就為我們提供了 `LengthFieldBasedFrameDecoder` 解碼器,它可以有效的使用自定義長度幀來解決上述問題。
同時一個好的網路協議,還會將動作和業務資料分離。試想一下, `HTTP` 協議的分為請求頭,請求體——
- 請求頭:定義了介面地址、`Http Method`、`HTTP` 版本
- 請求體:定義了需要傳遞的資料
這就是一種分離關注點的思想。所以自定義的網路協議也可以包含:
- 動作指令:比如定義 `code` 來分門別類的代表不同的業務邏輯
- 序列化演算法:描述了 `JAVA` 物件和二進位制之間轉換的形式,提供多種序列化/反序列化方式。比如 `json`、`protobuf` 等等,甚至是自定義演算法。比如:`rocketmq ` 等等。
同時,協議的開頭可以定義一個約定的`魔數`。這個固定值(4位元組),一般用來判斷當前的資料包是否合法。比如,當我們使用 `telnet` 傳送錯誤的資料包時,很顯然,它不合法,會導致解碼失敗。所以,為了減輕伺服器的壓力,我們可以取出資料包的前`4`個位元組與固定的`魔數`對比,如果是非法的格式,直接關閉連線,不繼續解碼。
**網路協議結構如下所示**:
```javascript
+--------------+-----------+------------+-----------+----------+
| 魔數(4) | code(1) |序列化演算法(1) |資料長度(4) |資料(n) |
+--------------+-----------+------------+-----------+----------+
```
### RocketMQ 通訊網路協議的實現
**RocketMQ 網路協議**
這一小節,我們從`RocketMQ` 中,分析優秀通訊網路協議的實現。`RocketMQ` 專案中,客戶端和服務端的通訊是基於 Netty 之上構建的。同時,為了更加有效的通訊,往往需要對傳送的訊息自定義網路協議。
`RocketMQ` 的網路協議,從資料分類的角度上看,可分為兩大類
- 訊息頭資料(Header Data)
- 訊息體資料(Body Data)
![](https://img2020.cnblogs.com/blog/299214/202003/299214-20200330040625987-1566249621.png)
從左到右
- 第一段:4 個位元組整數,等於2、3、4 長度總和
- 第二段:4 個位元組整數,等於3 的長度。特別的 `byte[0]` 代表序列化演算法,`byte[1~3]`才是真正的長度
- 第三段:代表訊息頭資料,結構如下
```java
{
"code":0,
"language":"JAVA",
"version":0,
"opaque":0,
"flag":1,
"remark":"hello, I am respponse /127.0.0.1:27603",
"extFields":{
"count":"0",
"messageTitle":"HelloMessageTitle"
}
}
```
- 第四段:代表訊息體資料
**RocketMQ 訊息頭協議詳細如下:**
| Header 欄位名 | 型別 | Request | Response |
| ------------- | ---------------------- | ------------------------------------------------------------ | -------------------------------------------- |
| code | 整數 | 請求操作程式碼,請求接收方根據不同的程式碼做不同的操作 | 應答結果程式碼,0表示成功,非0表示各種錯誤程式碼 |
| language | 字串 | 請求發起方實現語言,預設JAVA | 應答接收方實現語言 |
| version | 整數 | 請求發起方程式版本 | 應答接收方程式版本 |
| opaque | 整數 | 請求發起方在同一連線上不同的請求標識程式碼,多執行緒連線複用使用 | 應答方不做修改,直接返回 |
| flag | 整數 | 通訊層的標誌位 | 通訊層的標誌位 |
| remark | 字串 | 傳輸自定義文字資訊 | 錯誤詳細描述資訊 |
| extFields | HashMap | 請求自定義欄位 | 應答自定義欄位 |
**編碼過程**
`RocketMQ` 的通訊模組是基於 `Netty`的。通過定義 `NettyEncoder` 來實現對每一個 `Channel`的 出棧資料進行編碼,如下所示:
```java
@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder {
@Override
public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out)
throws Exception {
try {
ByteBuffer header = remotingCommand.encodeHeader();
out.writeBytes(header);
byte[] body = remotingCommand.getBody();
if (body != null) {
out.writeBytes(body);
}
} catch (Exception e) {
...
}
}
}
```
其中,核心的編碼過程位於 `RemotingCommand` 物件中,`encodeHeader` 階段,需要統計出訊息總長度,即:
- 定義訊息頭長度,一個整數表示:佔4個位元組
- 定義訊息頭資料,並計算其長度
- 定義訊息體資料,並計算其長度
- 額外再加 4是因為需要加入訊息總長度,一個整數表示:佔4個位元組
```java
public ByteBuffer encodeHeader(final int bodyLength) {
// 1> 訊息頭長度,一個整數表示:佔4個位元組
int length = 4;
// 2> 訊息頭資料
byte[] headerData;
headerData = this.headerEncode();
// 再加訊息頭資料長度
length += headerData.length;
// 3> 再加訊息體資料長度
length += bodyLength;
// 4> 額外加 4是因為需要加入訊息總長度,一個整數表示:佔4個位元組
ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength);
// 5> 將訊息總長度加入 ByteBuffer
result.putInt(length);
// 6> 將訊息的頭長度加入 ByteBuffer
result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC));
// 7> 將訊息頭資料加入 ByteBuffer
result.put(headerData);
result.flip();
return result;
}
```
其中,`encode` 階段會將 `CommandCustomHeader` 資料轉換 `HashMap`,方便序列化
```java
public void makeCustomHeaderToNet() {
if (this.customHeader != null) {
Field[] fields = getClazzFields(customHeader.getClass());
if (null == this.extFields) {
this.extFields = new HashMap();
}
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
String name = field.getName();
if (!name.startsWith("this")) {
Object value = null;
try {
field.setAccessible(true);
value = field.get(this.customHeader);
} catch (Exception e) {
log.error("Failed to access field [{}]", name, e);
}
if (value != null) {
this.extFields.put(name, value.toString());
}
}
}
}
}
}
```
特別的,訊息頭序列化支援兩種演算法:
- `JSON`
- `RocketMQ`
```java
private byte[] headerEncode() {
this.makeCustomHeaderToNet();
if (SerializeType.ROCKETMQ == serializeTypeCurrentRPC) {
return RocketMQSerializable.rocketMQProtocolEncode(this);
} else {
return RemotingSerializable.encode(this);
}
}
```
這兒需要值得注意的是,`encode`階段將當前 `RPC` 型別和 `headerData`長度編碼到一個 `byte[4]` 陣列中,`byte[0]` 位序列化型別。
```java
public static byte[] markProtocolType(int source, SerializeType type) {
byte[] result = new byte[4];
result[0] = type.getCode();
result[1] = (byte) ((source >> 16) & 0xFF);
result[2] = (byte) ((source >> 8) & 0xFF);
result[3] = (byte) (source & 0xFF);
return result;
}
```
其中,通過與運算 `& 0xFF` 取低八位資料。
所以, 最終 `length` 長度等於序列化型別 + header length + header data + body data 的位元組的長度。
**解碼過程**
`RocketMQ` 解碼通過`NettyDecoder`來實現,它繼承自 `LengthFieldBasedFrameDecoder`,其中呼叫了父類`LengthFieldBasedFrameDecoder`的建構函式
```java
super(FRAME_MAX_LENGTH, 0, 4, 0, 4);
```
這些引數設定`4`個位元組代表 `length`總長度,同時解碼時跳過最開始的`4`個位元組:
```java
frame = (ByteBuf) super.decode(ctx, in);
```
所以,得到的 `frame`= 序列化型別 + header length + header data + body data 。解碼如下所示:
```java
public static RemotingCommand decode(final ByteBuffer byteBuffer) {
//總長度
int length = byteBuffer.limit();
//原始的 header length,4位
int oriHeaderLen = byteBuffer.getInt();
//真正的 header data 長度。忽略 byte[0]的 serializeType
int headerLength = getHeaderLength(oriHeaderLen);
byte[] headerData = new byte[headerLength];
byteBuffer.get(headerData);
RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));
int bodyLength = length - 4 - headerLength;
byte[] bodyData = null;
if (bodyLength > 0) {
bodyData = new byte[bodyLength];
byteBuffer.get(bodyData);
}
cmd.body = bodyData;
return cmd;
}
private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) {
switch (type) {
case JSON:
RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class);
resultJson.setSerializeTypeCurrentRPC(type);
return resultJson;
case ROCKETMQ:
RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData);
resultRMQ.setSerializeTypeCurrentRPC(type);
return resultRMQ;
default:
break;
}
return null;
}
```
其中,`getProtocolType`,右移 `24`位,拿到 `serializeType`:
```java
public static SerializeType getProtocolType(int source) {
return SerializeType.valueOf((byte) ((source >> 24) & 0xFF));
}
```
`getHeaderLength` 拿到 0-24 位代表的 `headerData` length:
```java
public static int getHeaderLength(int length) {
return length & 0xFFFFFF;
}
```
### 小結
對於諸多中介軟體而言,底層的網路通訊模組往往會使用 `Netty`。`Netty` 提供了諸多的編解碼器,可以快速方便的上手。本文從如何設計一個網路協議入手,最終切入到 `RocketMQ` 底層網路協議的實現。可以看到,它並不複雜。仔細研讀幾遍變能理解其奧義。具體參考類`NettyEncoder`、`NettyDecoder`、`RemotingCom