1. 程式人生 > >一文徹底理解Redis序列化協議,你也可以編寫Redis客戶端

一文徹底理解Redis序列化協議,你也可以編寫Redis客戶端

前提

最近學習Netty的時候想做一個基於Redis服務協議的編碼解碼模組,過程中順便閱讀了Redis服務序列化協議RESP,結合自己的理解對文件進行了翻譯並且簡單實現了RESP基於Java語言的解析。編寫本文的使用使用的JDK版本為[8+]

RESP簡介

Redis客戶端與Redis服務端基於一個稱作RESP的協議進行通訊,RESP全稱為Redis Serialization Protocol,也就是Redis序列化協議。雖然RESPRedis設計,但是它也可以應用在其他客戶端-服務端(Client-Server)的軟體專案中。RESP在設計的時候折中考慮瞭如下幾點:

  • 易於實現。
  • 快速解析。
  • 可讀性高。

RESP可以序列化不同的資料型別,如整型、字串、陣列還有一種特殊的Error型別。需要執行的Redis命令會封裝為類似於字串陣列的請求然後通過Redis客戶端傳送到Redis服務端。Redis服務端會基於特定的命令型別選擇對應的一種資料型別進行回覆(這一句是意譯,原文是:Redis replies with a command-specific data type)。

RESP是二進位制安全的(binary-safe),並且在RESP下不需要處理從一個程序傳輸到另一個程序的批量資料,因為它使用了字首長度(prefixed-length,後面會分析,就是在每個資料塊的字首已經定義好資料塊的個數,類似於Netty

裡面的定長編碼解碼)來傳輸批量資料。

注意:此處概述的協議僅僅使用在客戶端-服務端通訊,Redis Cluster使用不同的二進位制協議在多個節點之間交換訊息(也就是Redis叢集中的節點之間並不使用RESP通訊)。

網路層

Redis客戶端通過建立一個在6379埠的TCP連線,連線到Redis服務端。

雖然RESP在底層通訊協議技術上是非TCP特定的,但在Redis的上下文中,RESP僅用於TCP連線(或類似的面向流的連線,如Unix套接字)。

請求-響應模型

Redis服務端接收由不同引數組成的命令,接收到命令並將其處理之後會把回覆傳送回Redis客戶端。這是最簡單的模型,但是有兩種例外的情況:

  • Redis支援管道(Pipelining,流水線,多數情況下習慣稱為管道)操作。使用管道的情況下,Redis客戶端可以一次傳送多個命令,然後等待一次性的回覆(文中的回覆是replies,理解為Redis服務端會一次性返回一個批量回復結果)。
  • Redis客戶端訂閱Pub/Sub通道時,該協議會更改語義併成為推送協議(push protocol),也就是說,客戶端不再需要傳送命令,因為Redis服務端將自動向客戶端(訂閱了改通道的客戶端)傳送新訊息(這裡的意思是:在訂閱/釋出模式下,訊息是由Redis服務端主動推送給訂閱了特定通道的Redis客戶端)。

除了上述兩個特例之外,Redis協議是一種簡單的請求-響應協議。

RESP支援的資料型別

RESPRedis 1.2中引入,在Redis 2.0RESP正式成為與Redis服務端通訊的標準方案。也就是如果需要編寫Redis客戶端,你就必須在客戶端中實現此協議。

RESP本質上是一種序列化協議,它支援的資料型別如下:單行字串、錯誤訊息、整型數字、定長字串和RESP陣列。

RESPRedis中用作請求-響應協議的方式如下:

  • Redis客戶端將命令封裝為RESP的陣列型別(陣列元素都是定長字串型別,注意這一點,很重要)傳送到Redis伺服器。
  • Redis服務端根據命令實現選擇對應的RESP資料型別之一進行回覆。

RESP中,資料型別取決於資料報的第一個位元組:

  • 單行字串的第一個位元組為+
  • 錯誤訊息的第一個位元組為-
  • 整型數字的第一個位元組為:
  • 定長字串的第一個位元組為$
  • RESP陣列的第一個位元組為*

另外,在RESP中可以使用定長字串或者陣列的特殊變體來表示Null值,後面會提及。在RESP中,協議的不同部分始終以\r\nCRLF)終止。

目前RESP中5種資料型別的小結如下:

資料型別 本文翻譯名稱 基本特徵 例子
Simple String 單行字串 第一個位元組是+,最後兩個位元組是\r\n,其他位元組是字串內容 +OK\r\n
Error 錯誤訊息 第一個位元組是-,最後兩個位元組是\r\n,其他位元組是異常訊息的文字內容 -ERR\r\n
Integer 整型數字 第一個位元組是:,最後兩個位元組是\r\n,其他位元組是數字的文字內容 :100\r\n
Bulk String 定長字串 第一個位元組是$,緊接著的位元組是內容字串長度\r\n,最後兩個位元組是\r\n,其他位元組是字串內容 $4\r\ndoge\r\n
Array RESP陣列 第一個位元組是*,緊接著的位元組是元素個數\r\n,最後兩個位元組是\r\n,其他位元組是各個元素的內容,每個元素可以是任意一種資料型別 *2\r\n:100\r\n$4\r\ndoge\r\n

下面的小節是對每種資料型別的更細緻的分析。

RESP簡單字串-Simple String

簡單字串的編碼方式如下:

  • (1)第一個位元組為+
  • (2)緊接著的是一個不能包含CR或者LF字元的字串。
  • (3)以CRLF終止。

簡單字串能夠保證在最小開銷的前提下傳輸非二進位制安全的字串。例如很多Redis命令執行成功後服務端需要回復OK字串,此時通過簡單字串編碼為5位元組的資料報如下:

+OK\r\n

如果需要傳送二進位制安全的字串,那麼需要使用定長字串。

Redis服務端用簡單字串響應時,Redis客戶端庫應該向呼叫者返回一個字串,該響應到呼叫者的字串由+之後直到字串內容末尾的字元組成(其實就是上面提到的第(2)部分的內容),不包括最後的CRLF位元組。

RESP錯誤訊息-Error

錯誤訊息型別是RESP特定的資料型別。實際上,錯誤訊息型別和簡單字串型別基本一致,只是其第一個位元組為-。錯誤訊息型別跟簡單字串型別的最大區別是:錯誤訊息作為Redis服務端響應的時候,對於客戶端而言應該感知為異常,而錯誤訊息中的字串內容應該感知為Redis服務端返回的錯誤資訊。錯誤訊息的編碼方式如下:

  • (1)第一個位元組為-
  • (2)緊接著的是一個不能包含CR或者LF字元的字串。
  • (3)以CRLF終止。

一個簡單的例子如下:

-Error message\r\n

Redis服務端只有在真正發生錯誤或者感知錯誤的時候才會回覆錯誤訊息,例如嘗試對錯誤的資料型別執行操作或者命令不存在等等。Redis客戶端接收到錯誤訊息的時候,應該觸發異常(一般情況就是直接丟擲異常,可以根據錯誤訊息的內容進行異常分類)。下面是錯誤訊息響應的一些例子:

-ERR unknown command 'foobar'
-WRONGTYPE Operation against a key holding the wrong kind of value

-之後的第一個單詞到第一個空格或換行符之間的內容,代表返回的錯誤型別。這只是Redis使用的約定,不是RESP錯誤訊息格式的一部分。

例如,ERR是通用錯誤,WRONGTYPE則是更具體的錯誤,表示客戶端試圖針對錯誤的資料型別執行操作。這種定義方式稱為錯誤字首,是一種使客戶端能夠理解伺服器返回的錯誤型別的方法,而不必依賴於所給出的確切訊息定義,該訊息可能會隨時間而變化。

客戶端實現可以針對不同的錯誤型別返回不同種類的異常,或者可以通過將錯誤型別的名稱作為字串直接提供給呼叫方來提供捕獲錯誤的通用方法。

但是,不應該將錯誤訊息分類處理的功能視為至關重要的功能,因為它作用並不巨大,並且有些的客戶端實現可能會簡單地返回特定值去遮蔽錯誤訊息作為通用的異常處理,例如直接返回false

RESP整型數字-Integer

整型數字的編碼方式如下:

  • (1)第一個位元組為
  • (2)緊接著的是一個不能包含CR或者LF字元的字串,也就是數字要先轉換為字元序列,最終要輸出為位元組。
  • (3)以CRLF終止。

例如:

:0\r\n
:1000\r\n

許多Redis命令返回整型數字,像INCRLLENLASTSAVE命令等等。

返回的整型數字沒有特殊的含義,像INCR返回的是增量的總量,而LASTSAVEUNIX時間戳。但是Redis服務端保證返回的整型數字在帶符號的64位整數範圍內。

有些情況下,返回的整型數字會指代true或者false。如EXISTS或者SISMEMBER命令執行返回1代表true,0代表false

有些情況下,返回的整型數字會指代命令是否真正產生了效果。如SADDSREMSETNX命令執行返回1代表命令執行生效,0代表命令執行不生效(等價於命令沒有執行)。

下面的一組命令執行後都是返回整型數字:SETNX, DEL, EXISTS, INCR, INCRBY, DECR, DECRBY, DBSIZE, LASTSAVE, RENAMENX, MOVE, LLEN, SADD, SREM, SISMEMBER, SCARD

RESP定長字串-Bulk String

定長字串用於表示一個最大長度為512MB的二進位制安全的字串(Bulk,本身有體積大的含義)。定長字串的編碼方式如下:

  • (1)第一個位元組為$
  • (2)緊接著的是組成字串的位元組數長度(稱為prefixed length,也就是字首長度),字首長度分塊以CRLF終止。
  • (3)然後是一個不能包含CR或者LF字元的字串,也就是數字要先轉換為字元序列,最終要輸出為位元組。
  • (4)以CRLF終止。

舉個例子,doge使用定長字串編碼如下:

第一個位元組 字首長度 CRLF 字串內容 CRLF 定長字串
$ 4 \r\n doge \r\n ===> $4\r\ndoge\r\n

foobar使用定長字串編碼如下:

第一個位元組 字首長度 CRLF 字串內容 CRLF 定長字串
$ 6 \r\n foobar \r\n ===> $6\r\nfoobar\r\n

表示空字串(Empty String,對應Java中的"") 的時候,使用定長字串編碼如下:

第一個位元組 字首長度 CRLF CRLF 定長字串
$ 0 \r\n \r\n ===> $0\r\n\r\n

定長字串也可以使用特殊的格式來表示Null值,指代值不存在。在這種特殊格式中,字首長度為-1,並且沒有資料,因此使用定長字串對Null值進行編碼如下:

第一個位元組 字首長度 CRLF 定長字串
$ -1 \r\n ===> $-1\r\n

Redis服務端返回定長字串編碼的Null值的時候,客戶端不應該返回空字串,而應該返回對應程式語言中的Null物件。例如Ruby中對應nilC語言中對應NULLJava中對應null,以此類推。

RESP陣列-Array

Redis客戶端使用RESP陣列傳送命令到Redis服務端。與此相似,某些Redis命令執行完畢後服務端需要使用RESP陣列型別將元素集合返回給客戶端,如返回一個元素列表的LRANGE命令。RESP陣列和我們認知中的陣列並不完全一致,它的編碼格式如下:

  • (1)第一個位元組為*
  • (2)緊接著的是組成RESP陣列的元素個數(十進位制數,但是最終需要轉換為位元組序列,如10需要轉換為10兩個相鄰的位元組),元素個數分塊以CRLF終止。
  • (3)RESP陣列的每個元素內容,每個元素可以是任意的RESP資料型別。

一個空的RESP陣列的編碼如下:

*0\r\n

一個包含2個定長字串元素內容分別為foobarRESP陣列的編碼如下:

*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n

通用格式就是:*<count>CRLF作為RESP陣列的字首部分,而組成RESP陣列的其他資料型別的元素只是一個接一個地串聯在一起。例如一個包含3個整數型別元素的RESP陣列的編碼如下:

*3\r\n:1\r\n:2\r\n:3\r\n

RESP陣列的元素不一定是同一種資料型別,可以包含混合型別的元素。例如下面是一個包含4個整數型別元素和1個定長字串型別元素(一共有5個元素)的RESP陣列的編碼(為了看得更清楚,分多行進行編碼,實際上不能這樣做):

# 元素個數
*5\r\n
# 第1個整型型別的元素
:1\r\n
# 第2個整型型別的元素
:2\r\n
# 第3個整型型別的元素
:3\r\n
# 第4個整型型別的元素
:4\r\n
# 定長字串型別的元素
$6\r\n
foobar\r\n

Redis服務端響應報的首行*5\r\n定義了後面會緊跟著5個回覆資料,然後每個回覆資料分別作元素項,構成了用於傳輸的多元素定長回覆(Multi Bulk Reply,感覺比較難翻譯,這裡的大概意思就是每個回覆行都是整個回覆報中的一個項)。

這裡可以類比為Java中的ArrayList(泛型擦除),有點類似於下面的虛擬碼:

List encode = new ArrayList();
// 新增元素個數
encode.add(elementCount);
encode.add(CRLF);
// 新增第1個整型型別的元素 - 1
encode.add(':');
encode.add(1);
encode.add(CRLF);
// 新增第2個整型型別的元素 - 2
encode.add(':');
encode.add(2);
encode.add(CRLF);
// 新增第3個整型型別的元素 - 3
encode.add(':');
encode.add(3);
encode.add(CRLF);
// 新增第4個整型型別的元素 - 4
encode.add(':');
encode.add(4);
encode.add(CRLF);
// 新增定長字串型別的元素
encode.add('$');
// 字首長度
encode.add(6);
// 字串內容
encode.add("foobar");
encode.add(CRLF);

RESP陣列中也存在Null值的概念,下面稱為RESP Null Array。處於歷史原因,RESP陣列中採用了另一種特殊的編碼格式定義Null值,區別於定長字串中的Null值字串。例如,BLPOP命令執行超時的時候,就會返回一個RESP Null Array型別的響應。RESP Null Array的編碼如下:

*-1\r\n

Redis服務端的回覆是RESP Null Array型別的時候,客戶端應該返回一個Null物件,而不是一個空陣列或者空列表。這一點比較重要,它是區分回覆是空陣列(也就是命令正確執行完畢,返回結果正常)或者其他原因(如BLPOP命令的超時等)的關鍵。

RESP陣列的元素也可以是RESP陣列,下面是一個包含2個RESP陣列型別的元素的RESP陣列,編碼如下(為了看得更清楚,分多行進行編碼,實際上不能這樣做):

# 元素個數
*2\r\n
# 第1個RESP陣列元素
*3\r\n
:1\r\n
:2\r\n
:3\r\n
# 第2個RESP陣列元素
*2\r\n
+Foo\r\n
-Bar\r\n

上面的RESP陣列的包含2個RESP陣列型別的元素,第1個RESP陣列元素包含3個整型型別的元素,而第2個RESP陣列元素包含1個簡單字串型別的元素和1個錯誤訊息型別的元素。

RESP陣列中的Null元素

RESP陣列中的單個元素也有Null值的概念,下面稱為Null元素。Redis服務端回覆如果是RESP陣列型別,並且RESP陣列中存在Null元素,那麼意味著元素丟失,絕對不能用空字串替代。缺少指定鍵的前提下,當與GET模式選項一起使用時,SORT命令可能會發生這種情況。

下面是一個包含Null元素的RESP陣列的例子(為了看得更清楚,分多行進行編碼,實際上不能這樣做):

*3\r\n
$3\r\n
foo\r\n
$-1\r\n
$3\r\n
bar\r\n

RESP陣列中的第2個元素是Null元素,客戶端API最終返回的內容應該是:

# Ruby
["foo",nil,"bar"]
# Java
["foo",null,"bar"]

RESP其他相關內容

主要包括:

  • 將命令傳送到Redis服務端的示例。
  • 批量命令與管道。
  • 內聯命令(Inline Commands)。

其實文件中還有一節使用C語言編寫高效能RESP解析器,這裡不做翻譯,因為掌握RESP的相關內容後,可以基於任何語言編寫解析器。

將命令傳送到Redis服務端

如果已經相對熟悉RESP中的序列化格式,那麼編寫Redis客戶端類庫就會變得很容易。我們可以進一步指定客戶端和伺服器之間的互動方式:

  • Redis客戶端向Redis服務端傳送僅僅包含定長字串型別元素的RESP陣列。
  • Redis服務端可以採用任意一種RESP資料型別向Redis客戶端進行回覆,具體的資料型別一般取決於命令型別。

下面是典型的互動例子:Redis客戶端傳送命令LLEN mylist以獲得KEYmylist的長度,Redis服務端將以整數型別進行回覆,如以下示例所示(C是客戶端,S伺服器),虛擬碼如下:

C: *2\r\n
C: $4\r\n
C: LLEN\r\n
C: $6\r\n
C: mylist\r\n

S: :48293\r\n

為了簡單起見,我們使用換行符來分隔協議的不同部分(這裡指上面的程式碼分行展示),但是實際互動的時候Redis客戶端在傳送*2\r\n$4\r\nLLEN\r\n$6\r\nmylist\r\n的時候是整體傳送的。

批量命令與管道

Redis客戶端可以使用相同的連線傳送批量命令。Redis支援管道特性,因此Redis客戶端可以通過一次寫操作傳送多個命令,而無需在傳送下一個命令之前讀取Redis服務端對上一個命令的回覆。批量傳送命令之後,所有的回覆可以在最後得到(合併為一個回覆)。更多相關資訊可以檢視Using pipelining to speedup Redis queries。

內聯命令

有些場景下,我們可能只有telnet命令可以使用,在這種條件下,我們需要傳送命令到Redis服務端。儘管Redis協議易於實現,但在互動式會話中並不理想,並且redis-cli有些情況下不一定可用。處於這類原因,Redis設計了一種專為人類設計的命令格式,稱為內聯命令(Inline Command格式。

以下是伺服器/客戶端使用內聯命令進行聊天的示例(S代表服務端,C代表客戶端):

C: PING
S: +PONG

以下是使用內聯命令返回整數的另一個示例:

C: EXISTS somekey
S: :0

基本上只需在telnet會話中編寫以空格分隔的引數。由於除了統一的請求協議之外沒有命令會以*開頭,Redis能夠檢測到這種情況並解析輸入的命令。

基於RESP編寫高效能解析器

因為JDK原生提供的位元組緩衝區java.nio.ByteBuffer存在不能自動擴容、需要切換讀寫模式等等問題,這裡直接引入Netty並且使用Netty提供的ByteBuf進行RESP資料型別解析。編寫本文的時候(2019-10-09)Netty的最新版本為4.1.42.Final。引入依賴:

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-buffer</artifactId>
    <version>4.1.42.Final</version>
</dependency>

定義解碼器介面:

public interface RespDecoder<V>{
    
    V decode(ByteBuf buffer);
}

定義常量:

public class RespConstants {

    public static final Charset ASCII = StandardCharsets.US_ASCII;
    public static final Charset UTF_8 = StandardCharsets.UTF_8;

    public static final byte DOLLAR_BYTE = '$';
    public static final byte ASTERISK_BYTE = '*';
    public static final byte PLUS_BYTE = '+';
    public static final byte MINUS_BYTE = '-';
    public static final byte COLON_BYTE = ':';

    public static final String EMPTY_STRING = "";
    public static final Long ZERO = 0L;
    public static final Long NEGATIVE_ONE = -1L;
    public static final byte CR = (byte) '\r';
    public static final byte LF = (byte) '\n';
    public static final byte[] CRLF = "\r\n".getBytes(ASCII);

    public enum ReplyType {

        SIMPLE_STRING,

        ERROR,

        INTEGER,

        BULK_STRING,

        RESP_ARRAY
    }
}

下面的章節中解析模組的實現已經忽略第一個位元組的解析,因為第一個位元組是決定具體的資料型別。

解析簡單字串

簡單字串型別就是單行字串,它的解析結果對應的就是Java中的String型別。解碼器實現如下:

// 解析單行字串
public class LineStringDecoder implements RespDecoder<String> {

    @Override
    public String decode(ByteBuf buffer) {
        return CodecUtils.X.readLine(buffer);
    }
}

public enum CodecUtils {

    X;

    public int findLineEndIndex(ByteBuf buffer) {
        int index = buffer.forEachByte(ByteProcessor.FIND_LF);
        return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
    }

    public String readLine(ByteBuf buffer) {
        int lineEndIndex = findLineEndIndex(buffer);
        if (lineEndIndex > -1) {
            int lineStartIndex = buffer.readerIndex();
            // 計算位元組長度
            int size = lineEndIndex - lineStartIndex - 1;
            byte[] bytes = new byte[size];
            buffer.readBytes(bytes);
            // 重置讀遊標為\r\n之後的第一個位元組
            buffer.readerIndex(lineEndIndex + 1);
            buffer.markReaderIndex();
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}

public class RespSimpleStringDecoder extends LineStringDecoder {
    
}

這裡抽取出一個類LineStringDecoder用於解析單行字串,這樣在解析錯誤訊息的時候可以做一次繼承即可。測試一下:

public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // +OK\r\n
    buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:OK

解析錯誤訊息

錯誤訊息的本質也是單行字串,所以其解碼的實現可以和簡單字串的解碼實現一致。錯誤訊息資料型別的解碼器如下:

public class RespErrorDecoder extends LineStringDecoder {

}

測試一下:

public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // -ERR unknown command 'foobar'\r\n
    buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:ERR unknown command 'foobar'

解析整型數字

整型數字型別,本質就是需要從位元組序列中還原出帶符號的64bit的長整型,因為是帶符號的,型別標識位:後的第一個位元組需要判斷是否負數字符-,因為是從左向右解析,然後每解析出一個新的位,當前的數字值要乘10。其解碼器的實現如下:

public class RespIntegerDecoder implements RespDecoder<Long> {

    @Override
    public Long decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        // 沒有行尾,異常
        if (-1 == lineEndIndex) {
            return null;
        }
        long result = 0L;
        int lineStartIndex = buffer.readerIndex();
        boolean negative = false;
        byte firstByte = buffer.getByte(lineStartIndex);
        // 負數
        if (RespConstants.MINUS_BYTE == firstByte) {
            negative = true;
        } else {
            int digit = firstByte - '0';
            result = result * 10 + digit;
        }
        for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
            byte value = buffer.getByte(i);
            int digit = value - '0';
            result = result * 10 + digit;
        }
        if (negative) {
            result = -result;
        }
        // 重置讀遊標為\r\n之後的第一個位元組
        buffer.readerIndex(lineEndIndex + 1);
        return result;
    }
}

整型數字型別的解析相對複雜,一定要注意負數判斷。測試一下:

public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // :-1000\r\n
    buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    Long value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:-1000

解析定長字串

定長字串型別解析的關鍵是先讀取型別識別符號$後的第一個位元組序列分塊解析成64bit帶符號的整數,用來確定後面需要解析的字串內容的位元組長度,然後再按照該長度讀取後面的位元組。其解碼器實現如下:

public class RespBulkStringDecoder implements RespDecoder<String> {

    @Override
    public String decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 使用RespIntegerDecoder讀取長度
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Bulk Null String
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Bulk Empty String
        if (RespConstants.ZERO.equals(length)) {
            return RespConstants.EMPTY_STRING;
        }
        // 真實位元組內容的長度
        int readLength = (int) length.longValue();
        if (buffer.readableBytes() > readLength) {
            byte[] bytes = new byte[readLength];
            buffer.readBytes(bytes);
            // 重置讀遊標為\r\n之後的第一個位元組
            buffer.readerIndex(buffer.readerIndex() + 2);
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}

測試一下:

public static void main(String[] args) throws Exception{
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // $6\r\nthrowable\r\n
    buffer = ByteBufAllocator.DEFAULT.buffer();
    buffer.writeBytes("$9".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:throwable

解析RESP陣列

RESP陣列型別解析的關鍵:

  • 先讀取型別識別符號*後的第一個位元組序列分塊解析成64bit帶符號的整數,確定陣列中的元素個數。
  • 遞迴解析每個元素。

參考過不少Redis協議解析框架,不少是用棧或者狀態機實現,這裡先簡單點用遞迴實現,解碼器程式碼如下:

public class RespArrayDecoder implements RespDecoder {

    @Override
    public Object decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 解析元素個數
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Null Array
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Array Empty List
        if (RespConstants.ZERO.equals(length)) {
            return Lists.newArrayList();
        }
        List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
        // 遞迴
        for (int i = 0; i < length; i++) {
            result.add(DefaultRespCodec.X.decode(buffer));
        }
        return result;
    }
}

測試一下:

public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    //*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
    buffer = ByteBufAllocator.DEFAULT.buffer();
    buffer.writeBytes("*2".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("foo".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("bar".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    List value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:[foo, bar]

小結

RESP的內容和其編碼解碼的過程有相對深刻的認識後,就可以基於Netty編寫Redis服務的編碼解碼模組,作為Netty入門的十分有意義的例子。本文的最後一節只演示了RESP的解碼部分,編碼模組和更多細節會在另一篇用Netty實現Redis客戶端的文章中展示。

參考資料:

  • Redis Protocol specification

連結

希望你能讀到這裡,然後發現我:

  • Github Page:http://www.throwable.club/2019/10/09/redis-serialization-protocol-decode-guide
  • Coding Page:http://throwable.coding.me/2019/10/09/redis-serialization-protocol-decode-guide

附錄

本文涉及的所有程式碼:

public class RespConstants {

    public static final Charset ASCII = StandardCharsets.US_ASCII;
    public static final Charset UTF_8 = StandardCharsets.UTF_8;

    public static final byte DOLLAR_BYTE = '$';
    public static final byte ASTERISK_BYTE = '*';
    public static final byte PLUS_BYTE = '+';
    public static final byte MINUS_BYTE = '-';
    public static final byte COLON_BYTE = ':';

    public static final String EMPTY_STRING = "";
    public static final Long ZERO = 0L;
    public static final Long NEGATIVE_ONE = -1L;
    public static final byte CR = (byte) '\r';
    public static final byte LF = (byte) '\n';
    public static final byte[] CRLF = "\r\n".getBytes(ASCII);

    public enum ReplyType {

        SIMPLE_STRING,

        ERROR,

        INTEGER,

        BULK_STRING,

        RESP_ARRAY
    }
}

public enum CodecUtils {

    X;

    public int findLineEndIndex(ByteBuf buffer) {
        int index = buffer.forEachByte(ByteProcessor.FIND_LF);
        return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
    }

    public String readLine(ByteBuf buffer) {
        int lineEndIndex = findLineEndIndex(buffer);
        if (lineEndIndex > -1) {
            int lineStartIndex = buffer.readerIndex();
            // 計算位元組長度
            int size = lineEndIndex - lineStartIndex - 1;
            byte[] bytes = new byte[size];
            buffer.readBytes(bytes);
            // 重置讀遊標為\r\n之後的第一個位元組
            buffer.readerIndex(lineEndIndex + 1);
            buffer.markReaderIndex();
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}

public interface RespCodec {

    RespCodec X = DefaultRespCodec.X;

    <IN, OUT> OUT decode(ByteBuf buffer);

    <IN, OUT> ByteBuf encode(IN in);
}

public enum DefaultRespCodec implements RespCodec {

    X;

    static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap();
    private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder();

    static {
        DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder());
        DECODERS.put(ReplyType.ERROR, new RespErrorDecoder());
        DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder());
        DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder());
        DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder());
    }

    @SuppressWarnings("unchecked")
    @Override
    public <IN, OUT> OUT decode(ByteBuf buffer) {
        return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer);
    }

    private ReplyType determineReplyType(ByteBuf buffer) {
        byte firstByte = buffer.readByte();
        ReplyType replyType;
        switch (firstByte) {
            case RespConstants.PLUS_BYTE:
                replyType = ReplyType.SIMPLE_STRING;
                break;
            case RespConstants.MINUS_BYTE:
                replyType = ReplyType.ERROR;
                break;
            case RespConstants.COLON_BYTE:
                replyType = ReplyType.INTEGER;
                break;
            case RespConstants.DOLLAR_BYTE:
                replyType = ReplyType.BULK_STRING;
                break;
            case RespConstants.ASTERISK_BYTE:
                replyType = ReplyType.RESP_ARRAY;
                break;
            default: {
                throw new IllegalArgumentException("first byte:" + firstByte);
            }
        }
        return replyType;
    }

    @Override
    public <IN, OUT> ByteBuf encode(IN in) {
        // TODO
        throw new UnsupportedOperationException("encode");
    }
}

public interface RespDecoder<V> {

    V decode(ByteBuf buffer);
}

public class DefaultRespDecoder implements RespDecoder {

    @Override
    public Object decode(ByteBuf buffer) {
        throw new IllegalStateException("decoder");
    }
}

public class LineStringDecoder implements RespDecoder<String> {

    @Override
    public String decode(ByteBuf buffer) {
        return CodecUtils.X.readLine(buffer);
    }
}

public class RespSimpleStringDecoder extends LineStringDecoder {

}

public class RespErrorDecoder extends LineStringDecoder {

}

public class RespIntegerDecoder implements RespDecoder<Long> {

    @Override
    public Long decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        // 沒有行尾,異常
        if (-1 == lineEndIndex) {
            return null;
        }
        long result = 0L;
        int lineStartIndex = buffer.readerIndex();
        boolean negative = false;
        byte firstByte = buffer.getByte(lineStartIndex);
        // 負數
        if (RespConstants.MINUS_BYTE == firstByte) {
            negative = true;
        } else {
            int digit = firstByte - '0';
            result = result * 10 + digit;
        }
        for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
            byte value = buffer.getByte(i);
            int digit = value - '0';
            result = result * 10 + digit;
        }
        if (negative) {
            result = -result;
        }
        // 重置讀遊標為\r\n之後的第一個位元組
        buffer.readerIndex(lineEndIndex + 1);
        return result;
    }
}

public class RespBulkStringDecoder implements RespDecoder<String> {

    @Override
    public String decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Bulk Null String
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Bulk Empty String
        if (RespConstants.ZERO.equals(length)) {
            return RespConstants.EMPTY_STRING;
        }
        // 真實位元組內容的長度
        int readLength = (int) length.longValue();
        if (buffer.readableBytes() > readLength) {
            byte[] bytes = new byte[readLength];
            buffer.readBytes(bytes);
            // 重置讀遊標為\r\n之後的第一個位元組
            buffer.readerIndex(buffer.readerIndex() + 2);
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}

public class RespArrayDecoder implements RespDecoder {

    @Override
    public Object decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 解析元素個數
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Null Array
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Array Empty List
        if (RespConstants.ZERO.equals(length)) {
            return Lists.newArrayList();
        }
        List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
        // 遞迴
        for (int i = 0; i < length; i++) {
            result.add(DefaultRespCodec.X.decode(buffer));
        }
        return result;
    }
}

(本文完 e-a-20191009 c-2-d