1. 程式人生 > 實用技巧 >Redis 3.0.4 壓縮列表

Redis 3.0.4 壓縮列表

  壓縮列表是列表鍵和雜湊鍵的底層實現之一。

  1.壓縮列表的構成

    1.1.ziplist結構如圖  

    1. zlbytes(4位元組) : 表示壓縮表的總長;
    2. zltail(4位元組):表示壓縮列表表尾節點距離列表的起始地點有多少位元組;
    3. zllen(2位元組):表示壓縮列表包含的節點數量(當值小於uint16_max 65535時,節點的真實數量需要遍歷整個壓縮列表才能得出);
    4. entryX(不定):表示壓縮列表的各個節點節點的長度由節點儲存的內容決定;
    5. zlend(1位元組):特殊值0xFF(255),用於標記壓縮列表的末尾。
//將zl定義到前四個位元組的bytes成員 記錄整個壓縮列表的記憶體位元組數
#define ZIPLIST_BYTES(zl) (*((uint32_t*)(zl))) //將zl定位到4位元組到8位元組的tail_offset成員 記錄整個壓縮列表的記憶體位元組數 #define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t)))) //將zl定位到8位元組到10位元組的length成員,記錄著壓縮列表的節點數量 #define ZIPLIST_LENGTH(zl) (*((uint16_t*)((zl)+sizeof(uint32_t)*2))) //壓縮列表表頭的大小10個位元組 #define
ZIPLIST_HEADER_SIZE (sizeof(uint32_t)*2+sizeof(uint16_t)) //返回壓縮列表首節點的地址 #define ZIPLIST_ENTRY_HEAD(zl) ((zl)+ZIPLIST_HEADER_SIZE) //返回壓縮列表尾節點的地址 #define ZIPLIST_ENTRY_TAIL(zl) ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) //返回end成員的地址 一個位元組 #define ZIPLIST_ENTRY_END(zl) ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)

    1.2.建立一個空的壓縮列表

//壓縮列表總共有11個位元組的固定長度
//建立並返回一個新的壓縮列表
unsigned char *ziplistNew(void) {
    //ZIPLIST_HEADER_SIZE 是壓縮列表的表頭 1位元組是末端的end大小
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
    unsigned char *zl = zmalloc(bytes);  //為表頭和表尾end成員分配空間
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);   //將bytes成員初始化為bytes=11
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); //空列表的tail_offset成員為表頭大小為10位元組
    ZIPLIST_LENGTH(zl) = 0; //位元組數量為0
    zl[bytes-1] = ZIP_END;   //將表尾end成員設定預設的255
    return zl;
}

  2.壓縮列表節點的構成

typedef struct zlentry {
    //prevrawlen 前驅節點的長度
    //prevrawlensize 編碼前驅節點的長度prevrawlen所需的位元組大小
    unsigned int prevrawlensize, prevrawlen;
    //len 當前節點值長度
    //lensize 編碼當前節點長度len所需的位元組數
    unsigned int lensize, len;
    //當前節點header的大小 = lensize + prevrawlensize
    unsigned int headersize;
    //當前節點的編碼方式
    unsigned char encoding;
    //指向當前節點的指標,以char *型別儲存
    unsigned char *p;
} zlentry; //壓縮列表節點資訊的結構

    壓縮列表節點的定義是zlentry,但是實質上儲存節點是利用了以下的結構

      1. previous_entry_length:記錄前驅節點的長度;
      2. encoding:即記錄當前節點的value成員的資料型別及長度;
      3. value:根據encoding來儲存位元組陣列或者整數。

      2.1 prevois_entry_length

        記錄前驅節點的長度,以位元組為單位。長度可以是1位元組或者5位元組。

        1>. 如果前一位元組的長度小於254,那麼previous_entry_length長度為1位元組,

        2>.如果前一位元組的長度大於等於254,那麼previous_entry_length長度為5位元組,其中第一個位元組是0XFE(254),而之後的四個位元組表示前一節點的長度。

      2.2 encoding

      記錄了節點的content屬性所儲存資料的型別以及長度。

        1.一位元組、兩位元組或者五位元組,值的最高位為00、01、10的位元組陣列編碼,這種編碼表示節點的content屬性儲存著位元組陣列,位元組陣列的長度由編碼出掉最高兩位之後的其他記錄。

        2.一位元組長,值的最高位以11開頭的是整數編碼:這種編碼表示節點的content屬性儲存著整數值,整數值的型別和長度由編碼除去最高兩位之後的其他記錄。

      2.3 content

        節點的content屬性負責儲存節點的值,節點值可以是一個節點陣列或者整數,值的型別和長度由節點的encoding屬性決定。

  3.連鎖更新

    在壓縮列表節點的結構裡previous_entry_length是記錄前一個節點的長度,如果前一個節點的長度小於254,那麼previous_entry_length需要1個位元組空間儲存前驅節點的長度值,如果前一個節點的長度大於等於254,那麼previous_entry_length需要5個節點空間儲存前驅節點的長度值。

    假設存在一種情況,壓縮列表裡面的節點長度都是小於254位元組,e1~eN位元組數都在250~253之間,這時有一個長度大於254位元組的new節點,需要插入到e1前,插入之後由於e1節點的previoud_entry_length是一個位元組的,不能儲存new節點的位元組長度,需要碎e1節點進行擴充套件,同理,e1擴充套件之後位元組大小大於等於254,則e1的後繼節點e2節點的previous_entry_length不能儲存e1擴充套件知乎的位元組長度,也需要對e2擴充套件,以此類推,之後的節點都需要擴充套件,所以連鎖更新最壞的情況下需要對壓縮列表執行N次空間重新分配操作,而每次空間分配操作的最壞的時間複雜度是O(n),所以連鎖更新最壞的時間複雜度是O(N^2)。

    如果e1~eN位元組數都是大於254,此時有一個長度小於254的new節點需要插入到e1前,也會產生連鎖更新,並且在刪除節點也會有連鎖更新。

  連鎖更新的程式碼如下:

//將列表zk的p指標針對的next節點進行移動
//
static unsigned char *__ziplistCascadeUpdate(unsigned char *zl, unsigned char *p) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), rawlen, rawlensize; //獲取當前列表的長度
    size_t offset, noffset, extra;
    unsigned char *np;
    zlentry cur, next;
    //ZIP_END標識尾節點
    while (p[0] != ZIP_END) {
        cur = zipEntry(p);
        rawlen = cur.headersize + cur.len;
        rawlensize = zipPrevEncodeLength(NULL,rawlen); //計算當前節點長度所需的位元組數

        /* Abort if there is no next entry. */
        if (p[rawlen] == ZIP_END) break;
        next = zipEntry(p+rawlen);  //獲取p的next節點

        /* Abort when "prevlen" has not changed. */
        if (next.prevrawlen == rawlen) break;
        //如果next節點儲存的前驅節點長度所需的位元組大小 小於 前驅節點所需的位元組大小
        if (next.prevrawlensize < rawlensize) {
            /* The "prevlen" field of "next" needs more bytes to hold
             * the raw length of "cur". */
            offset = p-zl; //p節點的偏移量
            extra = rawlensize-next.prevrawlensize; //需要額外增長的位元組大小
            zl = ziplistResize(zl,curlen+extra); //resize 連結串列所需空間大小
            p = zl+offset;

            /* Current pointer and offset for next element. */
            np = p+rawlen;  //next節點的新地址
            noffset = np-zl;  //next節點的偏移量
            //調整列表zltail的大小
            /* Update tail offset when next element is not the tail element. */
            if ((zl+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))) != np) {
                ZIPLIST_TAIL_OFFSET(zl) =
                    intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+extra);
            }

            /* Move the tail to the back. */
            //更新 移動next節點除header的prevrawlensize之外的剩餘list的記憶體
            //從next節點的prev_entry_len欄位之後的記憶體開始 到zlend之前的內容 
            memmove(np+rawlensize,
                np+next.prevrawlensize,
                curlen-noffset-next.prevrawlensize-1);
            //更新  將next節點的header以rawlen長度重新編碼,更新prevrawlensize和prevrawlen 
            //更新next節點的previous_entry_length欄位為rawlen
            zipPrevEncodeLength(np,rawlen);

            /* Advance the cursor */
            p += rawlen;
            curlen += extra;
        } else {
            if (next.prevrawlensize > rawlensize) {
                /* This would result in shrinking, which we want to avoid.
                 * So, set "rawlen" in the available bytes. */
                zipPrevEncodeLengthForceLarge(p+rawlen,rawlen);
            } else {
                zipPrevEncodeLength(p+rawlen,rawlen);
            }

            /* Stop here, as the raw length of "next" has not changed. */
            break;
        }
    }
    return zl;
}

  是根據給定的zl節點,然後通過更新p節點針對的next節點進行移動。首先獲取p節點的位元組長度和長度,來和p的next節點儲存的previous_entry_length比較大小,如果next.prevrawlensize < rawlensize,那麼需要對next.prev_entry_len欄位之後的記憶體開始,到zlend之前的內容,進行memmove,來為next節點記錄其前驅節點獲取到足夠的位元組大小。

  4.刪除節點

  從ziplist中,p指向的節點開始,最多刪除num個節點

//從p節點開始最多刪除num個節點
static unsigned char *__ziplistDelete(unsigned char *zl, unsigned char *p, unsigned int num) {
    unsigned int i, totlen, deleted = 0;
    size_t offset;
    int nextdiff = 0;
    zlentry first, tail;

    first = zipEntry(p);
    for (i = 0; p[0] != ZIP_END && i < num; i++) {
        p += zipRawEntryLength(p); //需要刪除的總位元組數
        deleted++; //需要刪除的節點數
    }

    totlen = p-first.p; //需要刪除的總位元組數
    if (totlen > 0) {
        if (p[0] != ZIP_END) {
            /* Storing `prevrawlen` in this entry may increase or decrease the
             * number of bytes required compare to the current `prevrawlen`.
             * There always is room to store this, because it was previously
             * stored by an entry that is now being deleted. */
            nextdiff = zipPrevLenByteDiff(p,first.prevrawlen);
            p -= nextdiff;  
            zipPrevEncodeLength(p,first.prevrawlen);
            
            /* Update offset for tail */
            //更新zltail  表尾據壓縮列表的起始地址的位元組數
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))-totlen);
            
            /* When the tail contains more than one entry, we need to take
             * "nextdiff" in account as well. Otherwise, a change in the
             * size of prevlen doesn't have an effect on the *tail* offset. */
            tail = zipEntry(p);
            if (p[tail.headersize+tail.len] != ZIP_END) {
                ZIPLIST_TAIL_OFFSET(zl) =
                   intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
            }
            
            /* Move tail to the front of the ziplist */
            //移動p節點到zlend節點至first.p 
            memmove(first.p,p,
                intrev32ifbe(ZIPLIST_BYTES(zl))-(p-zl)-1);
        } else {
            /* The entire tail was deleted. No need to move memory. */
            //p節點之後的所有元素都刪除了
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe((first.p-zl)-first.prevrawlen);
        }
        * Resize and update length */
        offset = first.p-zl;
        zl = ziplistResize(zl, intrev32ifbe(ZIPLIST_BYTES(zl))-totlen+nextdiff);
        ZIPLIST_INCR_LENGTH(zl,-deleted);
        p = zl+offset;

        /* When nextdiff != 0, the raw length of the next entry has changed, so
         * we need to cascade the update throughout the ziplist */
        //表示p節點的prev_raw_len_size需要擴容或者縮容  需要連鎖更新 
       if (nextdiff != 0)
            zl = __ziplistCascadeUpdate(zl,p);
    }
    return zl;
}

  刪除節點,需要知道要刪除的位元組長度,和節點數,需要處理的是如果刪除完之後的後續節點不是zlend,則要更新p節點的prevrawlen,並且更新zltail的長度。

  5.插入節點

    插入到p所指向的節點

/* Insert item at "p". */
//插入到p所指向的節點
static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */
    zlentry tail;


    /* Find out prevlen for the entry that is inserted. */
    //獲取p的前繼節點的長度prevlen
    if (p[0] != ZIP_END) {
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            prevlen = zipRawEntryLength(ptail);
        }
    }

    /* See if the entry can be encoded */
    //計算s節點的value encoding
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipEncodeLength will use the
         * string length to figure out how to encode it. */
        reqlen = slen;
    }
    /* We need space for both the length of the previous entry and
     * the length of the payload. */
    //獲取新節點的總長度
    reqlen += zipPrevEncodeLength(NULL,prevlen);
    reqlen += zipEncodeLength(NULL,encoding,slen);

    /* When the insert position is not equal to the tail, we need to
     * make sure that the next entry can hold this entry's length in
     * its prevlen field. */
    //c節點要插入到p節點的前面  計算p的prev_entry_len 和要插入的s節點的節點長度的差值
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

    /* Store offset because a realloc may change the address of zl. */
    offset = p-zl;
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

    /* Apply memory move when necessary and update tail offset. */
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
        //p+reqlen表示p節點的新地址  nextdiff表示前繼節點是否需要增大或者減少  
        //curlen-offset-1+nextdiff  表示移動長度
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

        /* Encode this entry's raw length in the next entry. */
        //更新移動之後的p節點的prev_raw_length
        zipPrevEncodeLength(p+reqlen,reqlen);

        /* Update offset for tail */
        //更新 zltail
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        /* When the tail contains more than one entry, we need to take
         * "nextdiff" in account as well. Otherwise, a change in the
         * size of prevlen doesn't have an effect on the *tail* offset. */
        tail = zipEntry(p+reqlen);
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        /* This element will be the new tail. */
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    /* When nextdiff != 0, the raw length of the next entry has changed, so
     * we need to cascade the update throughout the ziplist */
    if (nextdiff != 0) {
        offset = p-zl;
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    /* Write the entry */
    p += zipPrevEncodeLength(p,prevlen);
    p += zipEncodeLength(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}