一文徹底理解Redis序列化協議,你也可以編寫Redis客戶端
前提
最近學習Netty
的時候想做一個基於Redis
服務協議的編碼解碼模組,過程中順便閱讀了Redis
服務序列化協議RESP
,結合自己的理解對文件進行了翻譯並且簡單實現了RESP
基於Java
語言的解析。編寫本文的使用使用的JDK
版本為[8+]
。
RESP簡介
Redis
客戶端與Redis
服務端基於一個稱作RESP
的協議進行通訊,RESP
全稱為Redis Serialization Protocol
,也就是Redis
序列化協議。雖然RESP
為Redis
設計,但是它也可以應用在其他客戶端-服務端(Client-Server
)的軟體專案中。RESP
在設計的時候折中考慮瞭如下幾點:
- 易於實現。
- 快速解析。
- 可讀性高。
RESP
可以序列化不同的資料型別,如整型、字串、陣列還有一種特殊的Error
型別。需要執行的Redis
命令會封裝為類似於字串陣列的請求然後通過Redis
客戶端傳送到Redis
服務端。Redis
服務端會基於特定的命令型別選擇對應的一種資料型別進行回覆(這一句是意譯,原文是:Redis replies with a command-specific data type
)。
RESP
是二進位制安全的(binary-safe
),並且在RESP
下不需要處理從一個程序傳輸到另一個程序的批量資料,因為它使用了字首長度(prefixed-length
,後面會分析,就是在每個資料塊的字首已經定義好資料塊的個數,類似於Netty
注意:此處概述的協議僅僅使用在客戶端-服務端通訊,Redis Cluster
使用不同的二進位制協議在多個節點之間交換訊息(也就是Redis
叢集中的節點之間並不使用RESP
通訊)。
網路層
Redis
客戶端通過建立一個在6379
埠的TCP
連線,連線到Redis
服務端。
雖然RESP
在底層通訊協議技術上是非TCP
特定的,但在Redis
的上下文中,RESP
僅用於TCP
連線(或類似的面向流的連線,如Unix
套接字)。
請求-響應模型
Redis
服務端接收由不同引數組成的命令,接收到命令並將其處理之後會把回覆傳送回Redis
客戶端。這是最簡單的模型,但是有兩種例外的情況:
Redis
支援管道(Pipelining
,流水線,多數情況下習慣稱為管道)操作。使用管道的情況下,Redis
客戶端可以一次傳送多個命令,然後等待一次性的回覆(文中的回覆是replies
,理解為Redis
服務端會一次性返回一個批量回復結果)。- 當
Redis
客戶端訂閱Pub/Sub
通道時,該協議會更改語義併成為推送協議(push protocol
),也就是說,客戶端不再需要傳送命令,因為Redis
服務端將自動向客戶端(訂閱了改通道的客戶端)傳送新訊息(這裡的意思是:在訂閱/釋出模式下,訊息是由Redis
服務端主動推送給訂閱了特定通道的Redis
客戶端)。
除了上述兩個特例之外,Redis
協議是一種簡單的請求-響應協議。
RESP支援的資料型別
RESP
在Redis 1.2
中引入,在Redis 2.0
,RESP
正式成為與Redis
服務端通訊的標準方案。也就是如果需要編寫Redis
客戶端,你就必須在客戶端中實現此協議。
RESP
本質上是一種序列化協議,它支援的資料型別如下:單行字串、錯誤訊息、整型數字、定長字串和RESP
陣列。
RESP
在Redis
中用作請求-響應協議的方式如下:
Redis
客戶端將命令封裝為RESP
的陣列型別(陣列元素都是定長字串型別,注意這一點,很重要)傳送到Redis
伺服器。Redis
服務端根據命令實現選擇對應的RESP
資料型別之一進行回覆。
在RESP
中,資料型別取決於資料報的第一個位元組:
- 單行字串的第一個位元組為
+
。 - 錯誤訊息的第一個位元組為
-
。 - 整型數字的第一個位元組為
:
。 - 定長字串的第一個位元組為
$
。 RESP
陣列的第一個位元組為*
。
另外,在RESP
中可以使用定長字串或者陣列的特殊變體來表示Null
值,後面會提及。在RESP
中,協議的不同部分始終以\r\n
(CRLF
)終止。
目前RESP
中5種資料型別的小結如下:
資料型別 | 本文翻譯名稱 | 基本特徵 | 例子 |
---|---|---|---|
Simple String |
單行字串 | 第一個位元組是+ ,最後兩個位元組是\r\n ,其他位元組是字串內容 |
+OK\r\n |
Error |
錯誤訊息 | 第一個位元組是- ,最後兩個位元組是\r\n ,其他位元組是異常訊息的文字內容 |
-ERR\r\n |
Integer |
整型數字 | 第一個位元組是: ,最後兩個位元組是\r\n ,其他位元組是數字的文字內容 |
:100\r\n |
Bulk String |
定長字串 | 第一個位元組是$ ,緊接著的位元組是內容字串長度\r\n ,最後兩個位元組是\r\n ,其他位元組是字串內容 |
$4\r\ndoge\r\n |
Array |
RESP 陣列 |
第一個位元組是* ,緊接著的位元組是元素個數\r\n ,最後兩個位元組是\r\n ,其他位元組是各個元素的內容,每個元素可以是任意一種資料型別 |
*2\r\n:100\r\n$4\r\ndoge\r\n |
下面的小節是對每種資料型別的更細緻的分析。
RESP簡單字串-Simple String
簡單字串的編碼方式如下:
- (1)第一個位元組為
+
。 - (2)緊接著的是一個不能包含
CR
或者LF
字元的字串。 - (3)以
CRLF
終止。
簡單字串能夠保證在最小開銷的前提下傳輸非二進位制安全的字串。例如很多Redis
命令執行成功後服務端需要回復OK
字串,此時通過簡單字串編碼為5位元組的資料報如下:
+OK\r\n
如果需要傳送二進位制安全的字串,那麼需要使用定長字串。
當Redis
服務端用簡單字串響應時,Redis
客戶端庫應該向呼叫者返回一個字串,該響應到呼叫者的字串由+
之後直到字串內容末尾的字元組成(其實就是上面提到的第(2)部分的內容),不包括最後的CRLF
位元組。
RESP錯誤訊息-Error
錯誤訊息型別是RESP
特定的資料型別。實際上,錯誤訊息型別和簡單字串型別基本一致,只是其第一個位元組為-
。錯誤訊息型別跟簡單字串型別的最大區別是:錯誤訊息作為Redis
服務端響應的時候,對於客戶端而言應該感知為異常,而錯誤訊息中的字串內容應該感知為Redis
服務端返回的錯誤資訊。錯誤訊息的編碼方式如下:
- (1)第一個位元組為
-
。 - (2)緊接著的是一個不能包含
CR
或者LF
字元的字串。 - (3)以
CRLF
終止。
一個簡單的例子如下:
-Error message\r\n
Redis
服務端只有在真正發生錯誤或者感知錯誤的時候才會回覆錯誤訊息,例如嘗試對錯誤的資料型別執行操作或者命令不存在等等。Redis
客戶端接收到錯誤訊息的時候,應該觸發異常(一般情況就是直接丟擲異常,可以根據錯誤訊息的內容進行異常分類)。下面是錯誤訊息響應的一些例子:
-ERR unknown command 'foobar'
-WRONGTYPE Operation against a key holding the wrong kind of value
-
之後的第一個單詞到第一個空格或換行符之間的內容,代表返回的錯誤型別。這只是Redis
使用的約定,不是RESP
錯誤訊息格式的一部分。
例如,ERR
是通用錯誤,WRONGTYPE
則是更具體的錯誤,表示客戶端試圖針對錯誤的資料型別執行操作。這種定義方式稱為錯誤字首,是一種使客戶端能夠理解伺服器返回的錯誤型別的方法,而不必依賴於所給出的確切訊息定義,該訊息可能會隨時間而變化。
客戶端實現可以針對不同的錯誤型別返回不同種類的異常,或者可以通過將錯誤型別的名稱作為字串直接提供給呼叫方來提供捕獲錯誤的通用方法。
但是,不應該將錯誤訊息分類處理的功能視為至關重要的功能,因為它作用並不巨大,並且有些的客戶端實現可能會簡單地返回特定值去遮蔽錯誤訊息作為通用的異常處理,例如直接返回false
。
RESP整型數字-Integer
整型數字的編碼方式如下:
- (1)第一個位元組為
:
。 - (2)緊接著的是一個不能包含
CR
或者LF
字元的字串,也就是數字要先轉換為字元序列,最終要輸出為位元組。 - (3)以
CRLF
終止。
例如:
:0\r\n
:1000\r\n
許多Redis
命令返回整型數字,像INCR
,LLEN
和LASTSAVE
命令等等。
返回的整型數字沒有特殊的含義,像INCR
返回的是增量的總量,而LASTSAVE
是UNIX
時間戳。但是Redis
服務端保證返回的整型數字在帶符號的64位整數範圍內。
有些情況下,返回的整型數字會指代true
或者false
。如EXISTS
或者SISMEMBER
命令執行返回1代表true
,0代表false
。
有些情況下,返回的整型數字會指代命令是否真正產生了效果。如SADD
,SREM
和SETNX
命令執行返回1代表命令執行生效,0代表命令執行不生效(等價於命令沒有執行)。
下面的一組命令執行後都是返回整型數字:SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD
。
RESP定長字串-Bulk String
定長字串用於表示一個最大長度為512MB
的二進位制安全的字串(Bulk
,本身有體積大的含義)。定長字串的編碼方式如下:
- (1)第一個位元組為
$
。 - (2)緊接著的是組成字串的位元組數長度(稱為
prefixed length
,也就是字首長度),字首長度分塊以CRLF
終止。 - (3)然後是一個不能包含
CR
或者LF
字元的字串,也就是數字要先轉換為字元序列,最終要輸出為位元組。 - (4)以
CRLF
終止。
舉個例子,doge
使用定長字串編碼如下:
第一個位元組 | 字首長度 | CRLF |
字串內容 | CRLF |
定長字串 | |
---|---|---|---|---|---|---|
$ |
4 |
\r\n |
doge |
\r\n |
===> |
$4\r\ndoge\r\n |
foobar
使用定長字串編碼如下:
第一個位元組 | 字首長度 | CRLF |
字串內容 | CRLF |
定長字串 | |
---|---|---|---|---|---|---|
$ |
6 |
\r\n |
foobar |
\r\n |
===> |
$6\r\nfoobar\r\n |
表示空字串(Empty String
,對應Java中的""
) 的時候,使用定長字串編碼如下:
第一個位元組 | 字首長度 | CRLF |
CRLF |
定長字串 | |
---|---|---|---|---|---|
$ |
0 |
\r\n |
\r\n |
===> |
$0\r\n\r\n |
定長字串也可以使用特殊的格式來表示Null
值,指代值不存在。在這種特殊格式中,字首長度為-1,並且沒有資料,因此使用定長字串對Null
值進行編碼如下:
第一個位元組 | 字首長度 | CRLF |
定長字串 | |
---|---|---|---|---|
$ |
-1 |
\r\n |
===> |
$-1\r\n |
當Redis
服務端返回定長字串編碼的Null
值的時候,客戶端不應該返回空字串,而應該返回對應程式語言中的Null
物件。例如Ruby
中對應nil
,C
語言中對應NULL
,Java
中對應null
,以此類推。
RESP陣列-Array
Redis
客戶端使用RESP
陣列傳送命令到Redis
服務端。與此相似,某些Redis
命令執行完畢後服務端需要使用RESP
陣列型別將元素集合返回給客戶端,如返回一個元素列表的LRANGE
命令。RESP
陣列和我們認知中的陣列並不完全一致,它的編碼格式如下:
- (1)第一個位元組為
*
。 - (2)緊接著的是組成
RESP
陣列的元素個數(十進位制數,但是最終需要轉換為位元組序列,如10需要轉換為1
和0
兩個相鄰的位元組),元素個數分塊以CRLF
終止。 - (3)
RESP
陣列的每個元素內容,每個元素可以是任意的RESP
資料型別。
一個空的RESP
陣列的編碼如下:
*0\r\n
一個包含2個定長字串元素內容分別為foo
和bar
的RESP
陣列的編碼如下:
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
通用格式就是:*<count>CRLF
作為RESP
陣列的字首部分,而組成RESP
陣列的其他資料型別的元素只是一個接一個地串聯在一起。例如一個包含3個整數型別元素的RESP
陣列的編碼如下:
*3\r\n:1\r\n:2\r\n:3\r\n
RESP
陣列的元素不一定是同一種資料型別,可以包含混合型別的元素。例如下面是一個包含4個整數型別元素和1個定長字串型別元素(一共有5個元素)的RESP
陣列的編碼(為了看得更清楚,分多行進行編碼,實際上不能這樣做):
# 元素個數
*5\r\n
# 第1個整型型別的元素
:1\r\n
# 第2個整型型別的元素
:2\r\n
# 第3個整型型別的元素
:3\r\n
# 第4個整型型別的元素
:4\r\n
# 定長字串型別的元素
$6\r\n
foobar\r\n
Redis
服務端響應報的首行*5\r\n
定義了後面會緊跟著5個回覆資料,然後每個回覆資料分別作元素項,構成了用於傳輸的多元素定長回覆(Multi Bulk Reply
,感覺比較難翻譯,這裡的大概意思就是每個回覆行都是整個回覆報中的一個項)。
這裡可以類比為Java
中的ArrayList
(泛型擦除),有點類似於下面的虛擬碼:
List encode = new ArrayList();
// 新增元素個數
encode.add(elementCount);
encode.add(CRLF);
// 新增第1個整型型別的元素 - 1
encode.add(':');
encode.add(1);
encode.add(CRLF);
// 新增第2個整型型別的元素 - 2
encode.add(':');
encode.add(2);
encode.add(CRLF);
// 新增第3個整型型別的元素 - 3
encode.add(':');
encode.add(3);
encode.add(CRLF);
// 新增第4個整型型別的元素 - 4
encode.add(':');
encode.add(4);
encode.add(CRLF);
// 新增定長字串型別的元素
encode.add('$');
// 字首長度
encode.add(6);
// 字串內容
encode.add("foobar");
encode.add(CRLF);
RESP
陣列中也存在Null
值的概念,下面稱為RESP Null Array
。處於歷史原因,RESP
陣列中採用了另一種特殊的編碼格式定義Null
值,區別於定長字串中的Null
值字串。例如,BLPOP
命令執行超時的時候,就會返回一個RESP Null Array
型別的響應。RESP Null Array
的編碼如下:
*-1\r\n
當Redis
服務端的回覆是RESP Null Array
型別的時候,客戶端應該返回一個Null
物件,而不是一個空陣列或者空列表。這一點比較重要,它是區分回覆是空陣列(也就是命令正確執行完畢,返回結果正常)或者其他原因(如BLPOP
命令的超時等)的關鍵。
RESP
陣列的元素也可以是RESP
陣列,下面是一個包含2個RESP
陣列型別的元素的RESP
陣列,編碼如下(為了看得更清楚,分多行進行編碼,實際上不能這樣做):
# 元素個數
*2\r\n
# 第1個RESP陣列元素
*3\r\n
:1\r\n
:2\r\n
:3\r\n
# 第2個RESP陣列元素
*2\r\n
+Foo\r\n
-Bar\r\n
上面的RESP
陣列的包含2個RESP
陣列型別的元素,第1個RESP
陣列元素包含3個整型型別的元素,而第2個RESP
陣列元素包含1個簡單字串型別的元素和1個錯誤訊息型別的元素。
RESP
陣列中的Null元素
RESP
陣列中的單個元素也有Null
值的概念,下面稱為Null
元素。Redis
服務端回覆如果是RESP
陣列型別,並且RESP
陣列中存在Null
元素,那麼意味著元素丟失,絕對不能用空字串替代。缺少指定鍵的前提下,當與GET
模式選項一起使用時,SORT
命令可能會發生這種情況。
下面是一個包含Null
元素的RESP
陣列的例子(為了看得更清楚,分多行進行編碼,實際上不能這樣做):
*3\r\n
$3\r\n
foo\r\n
$-1\r\n
$3\r\n
bar\r\n
RESP
陣列中的第2個元素是Null
元素,客戶端API
最終返回的內容應該是:
# Ruby
["foo",nil,"bar"]
# Java
["foo",null,"bar"]
RESP其他相關內容
主要包括:
- 將命令傳送到Redis服務端的示例。
- 批量命令與管道。
- 內聯命令(
Inline Commands
)。
其實文件中還有一節使用C
語言編寫高效能RESP
解析器,這裡不做翻譯,因為掌握RESP
的相關內容後,可以基於任何語言編寫解析器。
將命令傳送到Redis服務端
如果已經相對熟悉RESP
中的序列化格式,那麼編寫Redis
客戶端類庫就會變得很容易。我們可以進一步指定客戶端和伺服器之間的互動方式:
Redis
客戶端向Redis
服務端傳送僅僅包含定長字串型別元素的RESP
陣列。Redis
服務端可以採用任意一種RESP
資料型別向Redis
客戶端進行回覆,具體的資料型別一般取決於命令型別。
下面是典型的互動例子:Redis
客戶端傳送命令LLEN mylist
以獲得KEY
為mylist
的長度,Redis
服務端將以整數型別進行回覆,如以下示例所示(C
是客戶端,S
伺服器),虛擬碼如下:
C: *2\r\n
C: $4\r\n
C: LLEN\r\n
C: $6\r\n
C: mylist\r\n
S: :48293\r\n
為了簡單起見,我們使用換行符來分隔協議的不同部分(這裡指上面的程式碼分行展示),但是實際互動的時候Redis
客戶端在傳送*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n
的時候是整體傳送的。
批量命令與管道
Redis
客戶端可以使用相同的連線傳送批量命令。Redis
支援管道特性,因此Redis
客戶端可以通過一次寫操作傳送多個命令,而無需在傳送下一個命令之前讀取Redis
服務端對上一個命令的回覆。批量傳送命令之後,所有的回覆可以在最後得到(合併為一個回覆)。更多相關資訊可以檢視Using pipelining to speedup Redis queries。
內聯命令
有些場景下,我們可能只有telnet
命令可以使用,在這種條件下,我們需要傳送命令到Redis
服務端。儘管Redis
協議易於實現,但在互動式會話中並不理想,並且redis-cli
有些情況下不一定可用。處於這類原因,Redis
設計了一種專為人類設計的命令格式,稱為內聯命令(Inline Command
格式。
以下是伺服器/客戶端使用內聯命令進行聊天的示例(S代表服務端,C代表客戶端):
C: PING
S: +PONG
以下是使用內聯命令返回整數的另一個示例:
C: EXISTS somekey
S: :0
基本上只需在telnet
會話中編寫以空格分隔的引數。由於除了統一的請求協議之外沒有命令會以*
開頭,Redis
能夠檢測到這種情況並解析輸入的命令。
基於RESP編寫高效能解析器
因為JDK
原生提供的位元組緩衝區java.nio.ByteBuffer
存在不能自動擴容、需要切換讀寫模式等等問題,這裡直接引入Netty
並且使用Netty
提供的ByteBuf
進行RESP
資料型別解析。編寫本文的時候(2019-10-09)Netty
的最新版本為4.1.42.Final
。引入依賴:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.42.Final</version>
</dependency>
定義解碼器介面:
public interface RespDecoder<V>{
V decode(ByteBuf buffer);
}
定義常量:
public class RespConstants {
public static final Charset ASCII = StandardCharsets.US_ASCII;
public static final Charset UTF_8 = StandardCharsets.UTF_8;
public static final byte DOLLAR_BYTE = '$';
public static final byte ASTERISK_BYTE = '*';
public static final byte PLUS_BYTE = '+';
public static final byte MINUS_BYTE = '-';
public static final byte COLON_BYTE = ':';
public static final String EMPTY_STRING = "";
public static final Long ZERO = 0L;
public static final Long NEGATIVE_ONE = -1L;
public static final byte CR = (byte) '\r';
public static final byte LF = (byte) '\n';
public static final byte[] CRLF = "\r\n".getBytes(ASCII);
public enum ReplyType {
SIMPLE_STRING,
ERROR,
INTEGER,
BULK_STRING,
RESP_ARRAY
}
}
下面的章節中解析模組的實現已經忽略第一個位元組的解析,因為第一個位元組是決定具體的資料型別。
解析簡單字串
簡單字串型別就是單行字串,它的解析結果對應的就是Java
中的String
型別。解碼器實現如下:
// 解析單行字串
public class LineStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
return CodecUtils.X.readLine(buffer);
}
}
public enum CodecUtils {
X;
public int findLineEndIndex(ByteBuf buffer) {
int index = buffer.forEachByte(ByteProcessor.FIND_LF);
return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
}
public String readLine(ByteBuf buffer) {
int lineEndIndex = findLineEndIndex(buffer);
if (lineEndIndex > -1) {
int lineStartIndex = buffer.readerIndex();
// 計算位元組長度
int size = lineEndIndex - lineStartIndex - 1;
byte[] bytes = new byte[size];
buffer.readBytes(bytes);
// 重置讀遊標為\r\n之後的第一個位元組
buffer.readerIndex(lineEndIndex + 1);
buffer.markReaderIndex();
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
這裡抽取出一個類LineStringDecoder
用於解析單行字串,這樣在解析錯誤訊息的時候可以做一次繼承即可。測試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// +OK\r\n
buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:OK
解析錯誤訊息
錯誤訊息的本質也是單行字串,所以其解碼的實現可以和簡單字串的解碼實現一致。錯誤訊息資料型別的解碼器如下:
public class RespErrorDecoder extends LineStringDecoder {
}
測試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// -ERR unknown command 'foobar'\r\n
buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:ERR unknown command 'foobar'
解析整型數字
整型數字型別,本質就是需要從位元組序列中還原出帶符號的64bit
的長整型,因為是帶符號的,型別標識位:
後的第一個位元組需要判斷是否負數字符-
,因為是從左向右解析,然後每解析出一個新的位,當前的數字值要乘10
。其解碼器的實現如下:
public class RespIntegerDecoder implements RespDecoder<Long> {
@Override
public Long decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
// 沒有行尾,異常
if (-1 == lineEndIndex) {
return null;
}
long result = 0L;
int lineStartIndex = buffer.readerIndex();
boolean negative = false;
byte firstByte = buffer.getByte(lineStartIndex);
// 負數
if (RespConstants.MINUS_BYTE == firstByte) {
negative = true;
} else {
int digit = firstByte - '0';
result = result * 10 + digit;
}
for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
byte value = buffer.getByte(i);
int digit = value - '0';
result = result * 10 + digit;
}
if (negative) {
result = -result;
}
// 重置讀遊標為\r\n之後的第一個位元組
buffer.readerIndex(lineEndIndex + 1);
return result;
}
}
整型數字型別的解析相對複雜,一定要注意負數判斷。測試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// :-1000\r\n
buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
Long value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:-1000
解析定長字串
定長字串型別解析的關鍵是先讀取型別識別符號$
後的第一個位元組序列分塊解析成64bit
帶符號的整數,用來確定後面需要解析的字串內容的位元組長度,然後再按照該長度讀取後面的位元組。其解碼器實現如下:
public class RespBulkStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 使用RespIntegerDecoder讀取長度
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Bulk Null String
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Bulk Empty String
if (RespConstants.ZERO.equals(length)) {
return RespConstants.EMPTY_STRING;
}
// 真實位元組內容的長度
int readLength = (int) length.longValue();
if (buffer.readableBytes() > readLength) {
byte[] bytes = new byte[readLength];
buffer.readBytes(bytes);
// 重置讀遊標為\r\n之後的第一個位元組
buffer.readerIndex(buffer.readerIndex() + 2);
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
測試一下:
public static void main(String[] args) throws Exception{
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// $6\r\nthrowable\r\n
buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes("$9".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
String value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:throwable
解析RESP陣列
RESP
陣列型別解析的關鍵:
- 先讀取型別識別符號
*
後的第一個位元組序列分塊解析成64bit
帶符號的整數,確定陣列中的元素個數。 - 遞迴解析每個元素。
參考過不少Redis
協議解析框架,不少是用棧或者狀態機實現,這裡先簡單點用遞迴實現,解碼器程式碼如下:
public class RespArrayDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 解析元素個數
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Null Array
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Array Empty List
if (RespConstants.ZERO.equals(length)) {
return Lists.newArrayList();
}
List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
// 遞迴
for (int i = 0; i < length; i++) {
result.add(DefaultRespCodec.X.decode(buffer));
}
return result;
}
}
測試一下:
public static void main(String[] args) throws Exception {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
//*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes("*2".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("foo".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
buffer.writeBytes("bar".getBytes(RespConstants.UTF_8));
buffer.writeBytes(RespConstants.CRLF);
List value = RespCodec.X.decode(buffer);
log.info("Decode result:{}", value);
}
// Decode result:[foo, bar]
小結
對RESP
的內容和其編碼解碼的過程有相對深刻的認識後,就可以基於Netty
編寫Redis
服務的編碼解碼模組,作為Netty
入門的十分有意義的例子。本文的最後一節只演示了RESP
的解碼部分,編碼模組和更多細節會在另一篇用Netty
實現Redis
客戶端的文章中展示。
參考資料:
- Redis Protocol specification
連結
希望你能讀到這裡,然後發現我:
- Github Page:http://www.throwable.club/2019/10/09/redis-serialization-protocol-decode-guide
- Coding Page:http://throwable.coding.me/2019/10/09/redis-serialization-protocol-decode-guide
附錄
本文涉及的所有程式碼:
public class RespConstants {
public static final Charset ASCII = StandardCharsets.US_ASCII;
public static final Charset UTF_8 = StandardCharsets.UTF_8;
public static final byte DOLLAR_BYTE = '$';
public static final byte ASTERISK_BYTE = '*';
public static final byte PLUS_BYTE = '+';
public static final byte MINUS_BYTE = '-';
public static final byte COLON_BYTE = ':';
public static final String EMPTY_STRING = "";
public static final Long ZERO = 0L;
public static final Long NEGATIVE_ONE = -1L;
public static final byte CR = (byte) '\r';
public static final byte LF = (byte) '\n';
public static final byte[] CRLF = "\r\n".getBytes(ASCII);
public enum ReplyType {
SIMPLE_STRING,
ERROR,
INTEGER,
BULK_STRING,
RESP_ARRAY
}
}
public enum CodecUtils {
X;
public int findLineEndIndex(ByteBuf buffer) {
int index = buffer.forEachByte(ByteProcessor.FIND_LF);
return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
}
public String readLine(ByteBuf buffer) {
int lineEndIndex = findLineEndIndex(buffer);
if (lineEndIndex > -1) {
int lineStartIndex = buffer.readerIndex();
// 計算位元組長度
int size = lineEndIndex - lineStartIndex - 1;
byte[] bytes = new byte[size];
buffer.readBytes(bytes);
// 重置讀遊標為\r\n之後的第一個位元組
buffer.readerIndex(lineEndIndex + 1);
buffer.markReaderIndex();
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public interface RespCodec {
RespCodec X = DefaultRespCodec.X;
<IN, OUT> OUT decode(ByteBuf buffer);
<IN, OUT> ByteBuf encode(IN in);
}
public enum DefaultRespCodec implements RespCodec {
X;
static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap();
private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder();
static {
DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder());
DECODERS.put(ReplyType.ERROR, new RespErrorDecoder());
DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder());
DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder());
DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder());
}
@SuppressWarnings("unchecked")
@Override
public <IN, OUT> OUT decode(ByteBuf buffer) {
return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer);
}
private ReplyType determineReplyType(ByteBuf buffer) {
byte firstByte = buffer.readByte();
ReplyType replyType;
switch (firstByte) {
case RespConstants.PLUS_BYTE:
replyType = ReplyType.SIMPLE_STRING;
break;
case RespConstants.MINUS_BYTE:
replyType = ReplyType.ERROR;
break;
case RespConstants.COLON_BYTE:
replyType = ReplyType.INTEGER;
break;
case RespConstants.DOLLAR_BYTE:
replyType = ReplyType.BULK_STRING;
break;
case RespConstants.ASTERISK_BYTE:
replyType = ReplyType.RESP_ARRAY;
break;
default: {
throw new IllegalArgumentException("first byte:" + firstByte);
}
}
return replyType;
}
@Override
public <IN, OUT> ByteBuf encode(IN in) {
// TODO
throw new UnsupportedOperationException("encode");
}
}
public interface RespDecoder<V> {
V decode(ByteBuf buffer);
}
public class DefaultRespDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
throw new IllegalStateException("decoder");
}
}
public class LineStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
return CodecUtils.X.readLine(buffer);
}
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
public class RespErrorDecoder extends LineStringDecoder {
}
public class RespIntegerDecoder implements RespDecoder<Long> {
@Override
public Long decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
// 沒有行尾,異常
if (-1 == lineEndIndex) {
return null;
}
long result = 0L;
int lineStartIndex = buffer.readerIndex();
boolean negative = false;
byte firstByte = buffer.getByte(lineStartIndex);
// 負數
if (RespConstants.MINUS_BYTE == firstByte) {
negative = true;
} else {
int digit = firstByte - '0';
result = result * 10 + digit;
}
for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
byte value = buffer.getByte(i);
int digit = value - '0';
result = result * 10 + digit;
}
if (negative) {
result = -result;
}
// 重置讀遊標為\r\n之後的第一個位元組
buffer.readerIndex(lineEndIndex + 1);
return result;
}
}
public class RespBulkStringDecoder implements RespDecoder<String> {
@Override
public String decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Bulk Null String
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Bulk Empty String
if (RespConstants.ZERO.equals(length)) {
return RespConstants.EMPTY_STRING;
}
// 真實位元組內容的長度
int readLength = (int) length.longValue();
if (buffer.readableBytes() > readLength) {
byte[] bytes = new byte[readLength];
buffer.readBytes(bytes);
// 重置讀遊標為\r\n之後的第一個位元組
buffer.readerIndex(buffer.readerIndex() + 2);
return new String(bytes, RespConstants.UTF_8);
}
return null;
}
}
public class RespArrayDecoder implements RespDecoder {
@Override
public Object decode(ByteBuf buffer) {
int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
if (-1 == lineEndIndex) {
return null;
}
// 解析元素個數
Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
if (null == length) {
return null;
}
// Null Array
if (RespConstants.NEGATIVE_ONE.equals(length)) {
return null;
}
// Array Empty List
if (RespConstants.ZERO.equals(length)) {
return Lists.newArrayList();
}
List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
// 遞迴
for (int i = 0; i < length; i++) {
result.add(DefaultRespCodec.X.decode(buffer));
}
return result;
}
}
(本文完 e-a-20191009 c-2-d