1. 程式人生 > >Redis底層詳解(四) 整數集合

Redis底層詳解(四) 整數集合

一、集合概述

        對於集合,STL 的 set 相信大家都不陌生,它的底層實現是紅黑樹。無論插入、刪除、查詢都是 O(log n) 的時間複雜度。當然,如果用雜湊表來實現集合,插入、刪除、查詢都可以達到 O(1)。那麼為什麼集合要用紅黑樹和沒有用雜湊表呢?我想,最大的可能是基於集合自身的特性,集合有它特有的操作:求交、求並、求差。這三個操作對於雜湊表來說都是 O(n) 的。基於這一點,相比無序的雜湊表來說,採用有序的紅黑樹會更加合適。

二、Redis 整數集合(intset)

        今天要講的整數集合,又稱為 intset,是 Redis 特有的資料結構。它的實現既不是紅黑樹,也不是雜湊表。就是簡單的陣列加上記憶體編碼。當儲存元素較少( 元素個數上限定義在server.h 的 OBJ_SET_MAX_INTSET_ENTRIES 巨集定義值為512)且均為整型時,才會使用到整數集合。它的查詢是 O(log n) 的,插入和刪除都是 O(n) 的。但是由於儲存元素相對較少的時候,O(log n) 和 O(n) 差距不是很大,但是用 Redis 的這種整數集合,相比紅黑樹和雜湊表來說,可以大大減少記憶體。
        所以,Redis 的 整數集合 intset 的存在主要還是為了節省記憶體。

        1、intset 結構定義
        intset 結構定義在 intset.h 中:

#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))

typedef struct intset {
    uint32_t encoding;      /* a */
    uint32_t length;        /* b */
    int8_t contents[];      /* c */
} intset;

        a) encoding 指定了編碼方式,總共有 INTSET_ENC_INT16、INTSET_ENC_INT32、INTSET_ENC_INT64 三種。從巨集定義可以看出,這三個值分別為 2、4、8。從字面意思可以看出三者能表示的範圍是 16位整數、32位整數 以及 64位整數。
        b) length 儲存了整數集合的元素個數。
        c) contents 為整數集合的柔性陣列,元素型別並不一定是 int8_t 型別的。 contents 不佔用結構體的大小,它只作為整數集合資料的首指標。整數集合中的元素按照從小到大的順序在 contents 中排列起來。

        2、編碼方式
        首先,我們來理解編碼方式 encoding 的含義。需要明確的一點是,對於一個整數集合來說,所有的元素的編碼一定是一致的(否則每個數都得存一個編碼,而不是將它存在 intset 結構體內了),那麼整個整數集合的編碼取決於集合中“絕對值”最大的那個數(之所以是絕對值,因為整數包含正數和負數)。
        通過那個絕對值最大的整數來獲取編碼,實現如下:

static uint8_t _intsetValueEncoding(int64_t v) {
    if (v < INT32_MIN || v > INT32_MAX)
        return INTSET_ENC_INT64;
    else if (v < INT16_MIN || v > INT16_MAX)
        return INTSET_ENC_INT32;
    else
        return INTSET_ENC_INT16;
}

        這段程式碼的含義是,如果整數 v 不能用 32位整數表示,那麼就需要用 INTSET_ENC_INT64 編碼;如果不能用 16位整數表示,那麼就需要用 INTSET_ENC_INT32 編碼;否則,採用 INTSET_ENC_INT16 編碼就行。核心就是:能用2個位元組表示就不用4個位元組,能用4個位元組表示就不用8個位元組,能省則省。
        幾個巨集定義在 stdint.h 中,如下:

/* Minimum of signed integral types. */ 
# define INT16_MIN      (-32767-1)  
# define INT32_MIN      (-2147483647-1)  

/* Maximum of signed integral types. */  
# define INT16_MAX      (32767)  
# define INT32_MAX      (2147483647)  

        3、編碼升級
        當前編碼方式不足以儲存更大位數的整數時,需要升級編碼。舉個例子,下圖所示的四個數字都在 [ -32768, 32767 ] 範圍內,所以採用 INTSET_ENC_INT16 編碼即可。contents 的陣列長度為 sizeof(int16_t) * 4 = 2 * 4 = 8 個位元組 ( 即64個二進位制位 )。

        然後我們插入一個數,它的值為 32768,比 INT16_MAX 大1,所以它需要採用 INTSET_ENC_INT32 編碼,而整數集合中所有的數的編碼需要保持一致。那麼,所有數的編碼都需要轉為 INTSET_ENC_INT32 編碼。這就是 “升級”。如圖所示:

        升級完後,contents 陣列的長度變為 sizeof(int32_t) * 5 = 4 * 5 = 20 個位元組 ( 即160個二進位制位 )。而且每個元素佔用的記憶體都擴大一倍,所在的相對位置也發生了變化,導致所有的元素都需要往高位記憶體遷移。
        那我們一開始就把所有的整數集合都用 INTSET_ENC_INT64 來編碼不就好了,還省得麻煩。原因是 Redis 設計 intset 的初衷還是為了節省記憶體,設想一個集合的元素永遠都不會超過 16位 整數,那麼用 64位整數的話,相當於浪費了 3倍 的記憶體。

三、整數集合常用操作

        1、建立集合
        建立一個整數集合 intsetNew,實現在 intset.c 中:

intset *intsetNew(void) {
    intset *is = zmalloc(sizeof(intset));
    is->encoding = intrev32ifbe(INTSET_ENC_INT16);
    is->length = 0;
    return is;
}

       初始建立的整數集合為空集合,用 zmalloc 進行記憶體分配後,定義編碼為 INTSET_ENC_INT16,這樣可以使記憶體儘量小。這裡需要注意的是,intset 的儲存直接涉及到記憶體編碼,所以需要考慮主機的位元組序問題(相關資料請參閱:位元組序)。
       intrev32ifbe 的意思是 int32 reversal if big endian。即 如果當前主機位元組序為大端序,那麼將它的記憶體儲存進行翻轉操作。簡言之,intset 的所有成員儲存方式都採用小端序。所以建立一個空的整數集合,記憶體分佈如下:

       瞭解了整數集合的記憶體編碼以後,我們來看看它的 設定 (set)和 獲取(get)。
       2、元素設定
       設定 的含義就是給定整數集合以及一個位置和值,將值設定到這個整數集合的對應位置上。_intsetSet 實現如下:

static void _intsetSet(intset *is, int pos, int64_t value) {
    uint32_t encoding = intrev32ifbe(is->encoding);          /* a */

    if (encoding == INTSET_ENC_INT64) {
        ((int64_t*)is->contents)[pos] = value;               /* b */
        memrev64ifbe(((int64_t*)is->contents)+pos);          /* c */
    } else if (encoding == INTSET_ENC_INT32) {
        ((int32_t*)is->contents)[pos] = value;
        memrev32ifbe(((int32_t*)is->contents)+pos);
    } else {
        ((int16_t*)is->contents)[pos] = value;
        memrev16ifbe(((int16_t*)is->contents)+pos);
    }
}

       a) 大端序和小端序只是儲存方式,encoding 在儲存的時候進行了一次 intrev32ifbe 轉換,取出來用的時候需要再進行一次 intrev32ifbe 轉換(其實就是序列化和反序列化)。
       b) 根據 encoding 的型別,將 contents 轉換成指定型別的指標,然後用 pos 進行索引找到對應的記憶體位置,然後將 value 的值設定到對應的記憶體中。
       c) memrev64ifbe 的實現參見 位元組序 的 memrev64 函式,即將對應記憶體的值轉換成小端序儲存。

       3、元素獲取
       獲取 的含義就是給定整數集合以及一個位置,返回給定位置的元素的值。_intsetGet 實現如下:

static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
    int64_t v64;
    int32_t v32;
    int16_t v16;

    if (enc == INTSET_ENC_INT64) {
        memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));   /* a */
        memrev64ifbe(&v64);                                      /* b */
        return v64;
    } else if (enc == INTSET_ENC_INT32) {
        memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
        memrev32ifbe(&v32);
        return v32;
    } else {
        memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
        memrev16ifbe(&v16);
        return v16;
    }
}

static int64_t _intsetGet(intset *is, int pos) {
    return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding));
}

       a) 根據 encoding 的型別,將 contents 轉換成指定型別的指標,然後用 pos 進行索引找到對應的記憶體位置,將記憶體位置上的值拷貝到臨時變數中;
       b) 由於是直接的記憶體拷貝,所以取出來的值還是小端序的,那麼在大端序的主機上得到的值是不對的,所以需要再做一次 memrev64ifbe 轉換將值還原。

       4、元素查詢
       由於整數集合是有序集合,所以查詢某個元素是否在整數集合中,Redis 採用的是二分查詢。intsetSearch 實現如下:

static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;                                        /* a */
        return 0;
    } else {                                                      /* b */
        if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }
    while(max >= min) {                                          
        mid = ((unsigned int)min + (unsigned int)max) >> 1;       /* c */
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }
    if (value == cur) {                                           /* d */
        if (pos) *pos = mid;
        return 1;
    } else {
        if (pos) *pos = min;
        return 0;
    }
}

       a) 整數集合為空,返回0表示查詢失敗;
       b) value 的值比整數集合中的最大值還大,或者比最小值還小,則返回0表示查詢失敗;
       c) 執行二分查詢,將找到的值存在 cur 中;
       d) 如果找到則返回1,表示查詢成功,並且將 pos 設定為 mid 並返回;如果沒找到則返回一個需要插入的位置。

       5、記憶體重分配
       由於 contents 的記憶體是動態分配的,所以每次進行元素插入或者刪除的時候,都需要重新分配記憶體,這個實現放在 intsetResize 中,實現如下:

static intset *intsetResize(intset *is, uint32_t len) {
    uint32_t size = len*intrev32ifbe(is->encoding);
    is = zrealloc(is,sizeof(intset)+size);
    return is;
}

       encoding 本身表示位元組個數,所以乘上集合個數 len 就是 contents 陣列需要的總位元組數了,呼叫 zrealloc 進行記憶體重分配,然後返回重新分配後的地址。
       注意:zrealloc 的返回值必須返回出去,因為 intset 在進行記憶體重分配以後,地址可能就變了。即 is = zrealloc(is, ...) 中,此 is 非彼 is。所以,所有呼叫 intsetResize 的函式都需要連帶的返回新的 intset 指標。
       6、編碼升級
       編碼升級一定發生在元素插入,並且插入的元素的絕對值比整數集合中的元素都大的時候,所以我們把升級後的元素插入和編碼升級放在一個函式實現,名曰 intsetUpgradeAndAdd,實現如下:

static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    uint8_t curenc = intrev32ifbe(is->encoding);
    uint8_t newenc = _intsetValueEncoding(value);                             
    int length = intrev32ifbe(is->length);
    int prepend = value < 0 ? 1 : 0;                                         /* a */
    is->encoding = intrev32ifbe(newenc);
    is = intsetResize(is,intrev32ifbe(is->length)+1);                        /* b */
    while(length--)
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));   /* c */
    if (prepend)
        _intsetSet(is,0,value);
    else
        _intsetSet(is,intrev32ifbe(is->length),value);                       /* d */
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

       a) curenc 記錄升級前的編碼,newenc 記錄升級後的編碼;
       b) 將整數集合 is 的編碼設定成新的編碼後,進行記憶體重分配;
       c) 獲取原先記憶體中的資料,設定到新記憶體中(注意:由於兩段記憶體空間是重疊的,而且新記憶體的長度一定大於原先記憶體,所以需要從後往前進行拷貝);
       d) 當插入的值 value 為負數的時候,為了保證集合的有序性,需要插入到 contents 的頭部;反之,插入到尾部;當 value 為負數時 prepend 為1,這樣就可以保證在記憶體拷貝的時候將第 0 個位置留空。
       如圖展示了一個 (-32768, 0, 1, 32767) 的整數集合在插入數字 32768 後的升級的完整過程:

        整數集合升級的時間複雜度是 O(n) 的,但是在整數集合的生命期內,升級最多發生兩次(從 INTSET_ENC_INT16 到 INTSET_ENC_INT32 以及 從 INTSET_ENC_INT32 到 INTSET_ENC_INT64)。
        7、記憶體遷移
        絕大多數情況都是在執行 插入 、刪除 、查詢 操作。插入 和 刪除 會涉及到連續記憶體的移動。Redis 的內部實現中有一個函式 intsetMoveTail 就是用來實現記憶體移動的。

static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
    void *src, *dst;
    uint32_t bytes = intrev32ifbe(is->length)-from;   /* a */
    uint32_t encoding = intrev32ifbe(is->encoding);

    if (encoding == INTSET_ENC_INT64) {
        src = (int64_t*)is->contents+from;                   
        dst = (int64_t*)is->contents+to;              
        bytes *= sizeof(int64_t);                     /* b */
    } else if (encoding == INTSET_ENC_INT32) {
        src = (int32_t*)is->contents+from;
        dst = (int32_t*)is->contents+to;
        bytes *= sizeof(int32_t);
    } else {
        src = (int16_t*)is->contents+from;
        dst = (int16_t*)is->contents+to;
        bytes *= sizeof(int16_t);
    }
    memmove(dst,src,bytes);                           /* c */
}

       a) 統計從 from 到結尾,有多少個元素;
       b) 根據不同的編碼,計算出需要拷貝的記憶體位元組數 bytes,以及拷貝源位置 src,拷貝目標位置 dst;
       c) memmove 是 string.h 中的函式:src指向的記憶體區域拷貝 bytes 個位元組到 dst 所指向的記憶體區域,這個函式是支援記憶體重疊的;

       8、元素插入
       最後,講整數集合的插入和刪除,插入呼叫的是 intsetAdd,在 intset.c 中實現:

intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;
    if (valenc > intrev32ifbe(is->encoding)) {                               /* a */
        return intsetUpgradeAndAdd(is,value);
    } else {
        if (intsetSearch(is,value,&pos)) {                                
            if (success) *success = 0;                                       /* b */
            return is;
        }
        is = intsetResize(is,intrev32ifbe(is->length)+1);                    /* c */
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);    /* d */
    }
    _intsetSet(is,pos,value);                                                 
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);                   /* e */
    return is;
}

       a) 插入的數值 value 的記憶體編碼大於現有集合的編碼,直接呼叫 intsetUpgradeAndAdd 進行編碼升級;
       b) 集合元素是不重複的,如果 intsetSearch 能夠找到,則將 success 置為0,表示此次插入失敗;
       c) 如果 intsetSearch 找不到,將 intset 進行記憶體重分配,即 長度 加 1。
       d) pos 為 intsetSearch 過程中找到的 value 將要插入的位置,我們將 pos 以後的記憶體向後移動1個單位 (這裡的1個單位可能是2個位元組、4個位元組或者8個位元組,取決於當前整數集合的記憶體編碼)。
       e) 呼叫 _intsetSet 將 value 的值設定到 pos 的位置上,然後給成員變數 length 加 1。最後返回 intset 指標首地址,因為其間進行了 intsetResize,傳入的 intset 指標和返回的有可能不是同一個了。

       9、元素刪除
       刪除元素呼叫的是 intsetRemove ,實現如下:

intset *intsetRemove(intset *is, int64_t value, int *success) {
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 0;
    if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {  /* a */
        uint32_t len = intrev32ifbe(is->length);
        if (success) *success = 1;
        if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);                        /* b */
        is = intsetResize(is,len-1);                                            /* c */
        is->length = intrev32ifbe(len-1); 
    }
    return is;
}

       a) 當整數集合中存在 value 這個元素時才能執行刪除操作;
       b) 如果能通過 intsetSearch 找到元素,那麼它的位置就在 pos 上,這是通過 intsetMoveTail 將記憶體往前挪;
       c) intsetResize 重新分配記憶體,並且將集合長度減1;