1. 程式人生 > >Redis之ziplist原始碼分析

Redis之ziplist原始碼分析

一、ziplist簡介

從上一篇分析我們知道quicklist的底層儲存使用了ziplist(壓縮列表),由於壓縮列表本身也有不少內容,所以重新開了一篇,在正式原始碼之前,還是先看下ziplist的特點:

1. ziplist是一種特殊編碼的雙向列表,特殊編碼是為了節省儲存空間。

2. ziplist允許同時存放字串和整型型別,並且整型數被編碼成真實的整型數而不是字串序列(節省空間)。

3. ziplist列表支援在頭部和尾部進行push和pop操作的時間複雜度都在常量範圍O(1),但是每次操作都涉及記憶體重新分配,尤其在頭部操作時,會涉及大段的記憶體移動操作,增加了操作的複雜性。

上面粗體部分會在下面的程式碼分析中一一體現(ziplist.h和ziplist.c)。

二、ziplist資料結構

 下面我們先看一下ziplist的結構示意圖:

 

上面示意圖展示了ziplist的整體結構,由於ziplist和entry的長度是不定長的,因此程式碼中也沒有這兩個介面的定義,這裡先給出一個示意結構定義,方便理解:

struct ziplist<T>{
    unsigned int zlbytes; // ziplist的長度位元組數,包含頭部、所有entry和zipend。
    unsigned int zloffset; // 從ziplist的頭指標到指向最後一個entry的偏移量,用於快速反向查詢
    unsigned short int zllength; // entry元素個數
    T[] entry;              // 元素值
    unsigned char zlend;   // ziplist結束符,值固定為0xFF
}

struct entry{
  char[var] prevlen; // 前面一個entry的位元組長度值。
  char[var] encoding; // 元素編碼型別
  char[] content;  // 元素內容
}

程式碼中對ziplist的變數的讀取和賦值都是通過巨集來實現的,如下:

#define ZIPLIST_BYTES(zl)       (*((uint32_t*)(zl)))
#define ZIPLIST_TAIL_OFFSET(zl) (*((uint32_t*)((zl)+sizeof(uint32_t))))
#define ZIPLIST_LENGTH(zl)      (*((uint16_t*)((zl)+sizeof(uint32_t)*2)))
#define ZIPLIST_HEADER_SIZE     (sizeof(uint32_t)*2+sizeof(uint16_t))
#define ZIPLIST_END_SIZE        (sizeof(uint8_t))
#define ZIPLIST_ENTRY_HEAD(zl)  ((zl)+ZIPLIST_HEADER_SIZE)
#define ZIPLIST_ENTRY_TAIL(zl)  ((zl)+intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl)))
#define ZIPLIST_ENTRY_END(zl)   ((zl)+intrev32ifbe(ZIPLIST_BYTES(zl))-1)

entry的結構要稍微複雜一些了,裡面對prevlen和encoding做了特殊編碼以節省空間,ziplist的精髓也正是在這裡體現的。

首先看prevlen賦值方法的原始碼:

/* Encode the length of the previous entry and write it to "p". Return the
 * number of bytes needed to encode this length if "p" is NULL. 
   把前一個entry的長度編碼後寫入當前entry的prevLen欄位,編碼規則:
   1. 如果len<254,prevLen佔用一個位元組,並寫入當前entry的第一個位元組。
   2. 如果len>=254,prevLen佔用五個位元組,第一個位元組固定寫入254,第二個至第五個位元組寫入實際的長度。
 */
static unsigned int zipPrevEncodeLength(unsigned char *p, unsigned int len) {
    if (p == NULL) { // 此時只是計算len所需的儲存長度
        return (len < ZIP_BIGLEN) ? 1 : sizeof(len)+1;
    } else {
        if (len < ZIP_BIGLEN) {
            p[0] = len;
            return 1;
        } else {
            p[0] = ZIP_BIGLEN;
            memcpy(p+1,&len,sizeof(len));
            memrev32ifbe(p+1);
            return 1+sizeof(len);
        }
    }
}
下面再來分析encoding欄位,redis根據儲存元素的值做不同的編碼(long long型別和String型別),long long型別編碼也是為了節省空間,其是在zipTryEncoding方法中進行:
/* Check if string pointed to by 'entry' can be encoded as an integer.
 * Stores the integer value in 'v' and its encoding in 'encoding'. 
 當儲存內容可以轉化為long long型別時,encoding佔用一個位元組,其中前2位固定都是1,後面6位根據value值大小不同,具體如下:
    a. OX11000000 表示content內容是int16,長度是2個位元組。
    b. OX11010000 表示content內容是int32,長度是4個位元組。
    c. OX11100000 表示content內容是int64,長度是8個位元組。
    d. OX11110000 表示content內容是int24,長度是3個位元組。
    e. OX11111110 表示content內容是int8,長度是1個位元組。
    f. OX11111111 表示ziplist的結束。
    g. 0X1111xxxx 表示極小數,儲存0-12的值,由於0000和1111都不能使用,所以它的實際值將是1至13,程式在取得這4位的值之後,還需要減去1,才能計算出正確的值,比如說,如果後4位為0001 = 1,那麼程式返回的值將是1-1=0。
 */
static int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
    long long value;

    if (entrylen >= 32 || entrylen == 0) return 0;
    if (string2ll((char*)entry,entrylen,&value)) {
        /* Great, the string can be encoded. Check what's the smallest
         * of our encoding types that can hold this value. */
        if (value >= 0 && value <= 12) {
            *encoding = ZIP_INT_IMM_MIN+value;
        } else if (value >= INT8_MIN && value <= INT8_MAX) {
            *encoding = ZIP_INT_8B;
        } else if (value >= INT16_MIN && value <= INT16_MAX) {
            *encoding = ZIP_INT_16B;
        } else if (value >= INT24_MIN && value <= INT24_MAX) {
            *encoding = ZIP_INT_24B;
        } else if (value >= INT32_MIN && value <= INT32_MAX) {
            *encoding = ZIP_INT_32B;
        } else {
            *encoding = ZIP_INT_64B;
        }
        *v = value;
        return 1;
    }
    return 0;
}

上述方法定義了是否可以編碼為long long型別,如果不能,則編碼為String型別並賦值,編碼程式碼在zipEncodeLength方法:

/* Encode the length 'rawlen' writing it in 'p'. If p is NULL it just returns
 * the amount of bytes required to encode such a length. 
 本方法對encoding是String型別時,進行編碼並賦值(如果entry內容可以轉化為long long型別,在zipTryEncoding方法中進行編碼),並根據不同長度的字串來編碼encoding的值,具體如下:
    a. 0X00xxxxxx 前兩位00表示最大長度為63的字串,後面6位表示實際字串長度,encoding佔用1個位元組。
    b. 0X01xxxxxx xxxxxxxx 前兩位01表示中等長度的字串(大於63小於等於16383),後面14位表示實際長度,encoding佔用兩個位元組。
    c. OX10000000 xxxxxxxx xxxxxxxx xxxxxxxx 表示特大字串,第一個位元組固定128(0X80),後面四個位元組儲存實際長度,encoding佔用5個位元組。
 */
static unsigned int zipEncodeLength(unsigned char *p, unsigned char encoding, unsigned int rawlen) {
    unsigned char len = 1, buf[5];

    if (ZIP_IS_STR(encoding)) {
        /* Although encoding is given it may not be set for strings,
         * so we determine it here using the raw length. */
        if (rawlen <= 0x3f) {
            if (!p) return len;
            buf[0] = ZIP_STR_06B | rawlen;
        } else if (rawlen <= 0x3fff) {
            len += 1;
            if (!p) return len;
            buf[0] = ZIP_STR_14B | ((rawlen >> 8) & 0x3f);
            buf[1] = rawlen & 0xff;
        } else {
            len += 4;
            if (!p) return len;
            buf[0] = ZIP_STR_32B;
            buf[1] = (rawlen >> 24) & 0xff;
            buf[2] = (rawlen >> 16) & 0xff;
            buf[3] = (rawlen >> 8) & 0xff;
            buf[4] = rawlen & 0xff;
        }
    } else {
        /* Implies integer encoding, so length is always 1. */
        if (!p) return len;
        buf[0] = encoding;
    }

    /* Store this length at p */
    memcpy(p,buf,len);
    return len;
}

 

三、ziplist增刪改查

 1. 建立ziplist

在執行lpush命令時,如果當前quicklistNode是新建的,則需要新建一個ziplist:

/* Add new entry to head node of quicklist.
 *
 * Returns 0 if used existing head.
 * Returns 1 if new head created. 
 在quicklist的頭部節點新增新元素:
 如果新元素新增在head中,返回0,否則返回1.
 */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
    quicklistNode *orig_head = quicklist->head;
    // 如果head不為空,且空間大小滿足新元素的儲存要求,則新元素新增到head中,否則新加一個quicklistNode
    if (likely(
            _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
        quicklist->head->zl =
            ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
        quicklistNodeUpdateSz(quicklist->head);
    } else {
        // 建立新的quicklistNode
        quicklistNode *node = quicklistCreateNode();
        // 把新元素新增到新建的ziplist中
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
        // 更新ziplist的長度到quicklistNode的sz欄位
        quicklistNodeUpdateSz(node);
        // 把新node新增到quicklist中,即新增到原head前面
        _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
    }
    quicklist->count++;
    quicklist->head->count++;
    return (orig_head != quicklist->head);
}
/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;
    zl[bytes-1] = ZIP_END;
    return zl;
}

 

 2. 新增entry

新增entry的程式碼在ziplistPush方法中:

unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
    unsigned char *p;
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
    return __ziplistInsert(zl,p,s,slen);
}

/* Insert item at "p". zl中新增一個元素 */
static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */
    zlentry tail;

    /* Find out prevlen for the entry that is inserted. */
    if (p[0] != ZIP_END) {
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        // 當之前的操作從尾巴刪除元素時,ZIPLIST_ENTRY_TAIL指標會向前遷移,此時ptail[0] != ZIP_END
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            prevlen = zipRawEntryLength(ptail);
        }
    }

    /* See if the entry can be encoded */
    // 檢查entry的value是否可以編碼為long long型別,如果可以就把值儲存在value中,
    // 並把所需最小位元組長度儲存在encoding
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipEncodeLength will use the
         * string length to figure out how to encode it. */
        reqlen = slen;
    }
    /* We need space for both the length of the previous entry and
     * the length of the payload. */
    reqlen += zipPrevEncodeLength(NULL,prevlen);
    reqlen += zipEncodeLength(NULL,encoding,slen);

    /* When the insert position is not equal to the tail, we need to
     * make sure that the next entry can hold this entry's length in
     * its prevlen field. */
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

    // reqlen是zlentry所需大小,nextdiff是待插入位置原entry中prelen與新entry中prelen所需儲存空間的大小差值。
    /* Store offset because a realloc may change the address of zl. */
    offset = p-zl;
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

    /* Apply memory move when necessary and update tail offset. */
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
        // 原資料向後移動,騰出空間寫入新的zlentry
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

        /* Encode this entry's raw length in the next entry. */
        // 新entry的長度寫入下一個zlentry的prelen
        zipPrevEncodeLength(p+reqlen,reqlen);

        /* Update offset for tail */
        // 更新ZIPLIST_TAIL_OFFSET指向原來的tail entry。
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        /* When the tail contains more than one entry, we need to take
         * "nextdiff" in account as well. Otherwise, a change in the
         * size of prevlen doesn't have an effect on the *tail* offset. */
        zipEntry(p+reqlen, &tail);
        // 如果原插入位置的entry不是最後的tail元素,需要調整ZIPLIST_TAIL_OFFSET值(增加nextdiff)
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        /* This element will be the new tail. */
        // ZIPLIST_TAIL_OFFSET指向新加的entry,即新加的entry是tail元素
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    /* When nextdiff != 0, the raw length of the next entry has changed, so
     * we need to cascade the update throughout the ziplist */
    if (nextdiff != 0) {
        // 如果nextdiff不為0,需要迴圈更新後續entry中的prelen,最差情況下,所有entry都需要更新一遍
        offset = p-zl;
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    /* Write the entry */
    // 給新加的entry賦值
    p += zipPrevEncodeLength(p,prevlen);
    p += zipEncodeLength(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}

從上面程式碼可以看出,如果是在頭部新增元素時,需要把執行memmove方法把當前ziplist中的所有元素後移一段距離,消耗還是比較大的。

 3. 刪除entry

刪除操作在ziplistDelete方法中實現,其邏輯和新增剛剛相反,就不再贅述了。

至此,ziplist的主體程式碼就分析結束了,從程式碼可以看到,ziplist的實現非常精妙,儘可能的節省儲存空間,但是在頭部操作時,會有大量的記憶體移動操作,消耗挺大,在尾部操作時,無記憶體移動,效率則要高很多。

本篇內容參考了錢文品的《Redis深度歷險:核心原理與應用實踐》,特此感