1. 程式人生 > >Redis原始碼剖析--整數集合Intset

Redis原始碼剖析--整數集合Intset

本系列部落格文章已經分析了Redis的大部分資料結構,包括動態字串,雙端連結串列,字典,跳躍表等,這些資料結構都非常強大實用,但是在記憶體消耗方面也非常“巨大”。Redis的資料都是存放在記憶體上面的,所以對記憶體的使用要求及其苛刻,Redis會想方設法的來節省記憶體。 假設有一組集合1,2,3,6,5,如果採用上述的資料結構來儲存的話,必然會付出昂貴的記憶體代價,因此,Redis在這種小資料量的條件下,會使用記憶體對映來代替內部資料結構。這就使得整數集合(intset)和壓縮(ziplist)這兩類節省記憶體的資料結構應運而生了。 # intset資料結構 Intset是集合鍵的底層實現之一,如果一個集合滿足只儲存整數元素和元素數量不多這兩個條件,那麼Redis就會採用intset來儲存這個資料集。intset的資料結構如下:
typedef struct intset {
    uint32_t encoding; // 編碼模式
    uint32_t length;  // 長度
int8_t contents[]; // 資料部分 } intset;
其中,encoding欄位表示該整數集合的編碼模式,Redis提供三種模式的巨集定義如下:
// 可以看出,雖然contents部分指明的型別是int8_t,但是資料並不以這個型別存放
// 資料以int16_t型別存放,每個佔2個位元組,能存放-32768~32767範圍內的整數
#define INTSET_ENC_INT16 (sizeof(int16_t)) 
// 資料以int32_t型別存放,每個佔4個位元組,能存放-2^32-1~2^32範圍內的整數
#define INTSET_ENC_INT32 (sizeof(int32_t)) 
// 資料以int64_t型別存放,每個佔8個位元組,能存放-2^64-1~2^64範圍內的整數 #define INTSET_ENC_INT64 (sizeof(int64_t))
length欄位用來儲存集合中元素的個數。 contents欄位用於儲存整數,陣列中的元素要求不含有重複的整數且按照從小到大的順序排列。在讀取和寫入的時候,均按照指定的encoding編碼模式讀取和寫入。 # 升級 inset中最值得一提的就是升級操作。當intset中新增的整數超過當前編碼型別的時候,intset會自定升級到能容納該整數型別的編碼模式,如1,2,3,4,建立該集合的時候,採用int16_t的型別儲存,現在需要像集合中新增一個整數40000,超出了當前集合能存放的最大範圍,這個時候就需要對該整數集合進行升級操作,將encoding欄位改成int32_6型別,並對contents欄位內的資料進行重排列。 Redis提供intsetUpgradeAndAdd函式來對整數集合進行升級然後新增資料。其升級過程可以參考如下圖示:

其原始碼如下:

// 升級整數集合並新增元素
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    // 獲取當前編碼格式
    uint8_t curenc = intrev32ifbe(is->encoding);
    // 獲取需要升級到的編碼格式
    uint8_t newenc = _intsetValueEncoding(value);
    // 獲取原整數集中的整數個數
    int length = intrev32ifbe(is->length);
    // 由於待新增的元素一定是大於或者小於整數集中所有元素,故此處需要判斷新增到新資料集的頭部或者尾部
    // 如果value為正,則新增到新資料集的尾部;反之則新增到首部
    int prepend = value < 0 ? 1 : 0;

    // 設定新的編碼格式
    is->encoding = intrev32ifbe(newenc);
    // 對原資料集進行擴容
    is = intsetResize(is,intrev32ifbe(is->length)+1);

    // 採用從後往前的重編碼順序,這樣就避免覆蓋資料了。
    while(length--)
        // 將原資料集中的資料依次賦值到新資料集中
        // _intsetGetEncoded(is,length,curenc)獲取資料集is的第length位上的資料,curenc為原資料集的編碼格式
        // _intsetSet將資料集is的第length+prepend位上設定為上一函式返回的值
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

    // 將待新增的資料新增到首部或者尾部
    if (prepend)
        _intsetSet(is,0,value);
    else
        _intsetSet(is,intrev32ifbe(is->length),value);
    // 修改新資料集的長度
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}
// 將value設定到整數集合is的第pos位
static void _intsetSet(intset *is, int pos, int64_t value) {
    // 獲取整數集合is的編碼格式
    uint32_t encoding = intrev32ifbe(is->encoding);
    // 針對不同的編碼格式做相應的處理
    if (encoding == INTSET_ENC_INT64) {
        // 將對應的pos位設定成value
        ((int64_t*)is->contents)[pos] = value;
        // 如果必要,對新值進行大小端轉換
        memrev64ifbe(((int64_t*)is->contents)+pos);
    } else if (encoding == INTSET_ENC_INT32) {
        // 同上
        ((int32_t*)is->contents)[pos] = value;
        memrev32ifbe(((int32_t*)is->contents)+pos);
    } else {
        // 同上
        ((int16_t*)is->contents)[pos] = value;
        memrev16ifbe(((int16_t*)is->contents)+pos);
    }
}
// 獲取整數集is中,按照enc編碼格式的第pos位上的元素
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
    int64_t v64;
    int32_t v32;
    int16_t v16;

    // 針對不同的編碼格式做相應的處理
    // (enc*)is->contents獲取整數集中的資料部分
    // (enc*)is->contents+pos獲取第pos位上的元素
    // memrevEncifbe(&vEnc)如有必要對拷貝出來的值進行大小端轉換
    if (enc == INTSET_ENC_INT64) {
        memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));
        memrev64ifbe(&v64);
        return v64;
    } else if (enc == INTSET_ENC_INT32) {
        memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
        memrev32ifbe(&v32);
        return v32;
    } else {
        memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
        memrev16ifbe(&v16);
        return v16;
    }
}

Redis不提供降級操作,所以一旦對陣列進行了升級,編碼就會一直保持升級後的狀態。

inset基本操作

建立intset

Redis在建立intset集合時,預設採用int16_t編碼格式。

intset *intsetNew(void) {
    intset *is = zmalloc(sizeof(intset));
    is->encoding = intrev32ifbe(INTSET_ENC_INT16);
    is->length = 0;
    return is;
}

新增元素

intset在新增元素時需要判斷新資料的大小,如果超出原編碼格式能表示的範圍,則呼叫上面的intsetUpgradeAndAdd函式進行新增,如果沒有超出,則直接新增到指定位置。

// 向整數集合中新增元素
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;

    // 如果超出了當前編碼格式所能表示的範圍,則升級整數集合並新增元素
    if (valenc > intrev32ifbe(is->encoding)) {
        return intsetUpgradeAndAdd(is,value);
    } else {
        // 如果沒有超出,則計算待新增整數需要應新增到整數集合中的位置
        if (intsetSearch(is,value,&pos)) {
            // intset中應不存在相同元素,如果待新增的整數已存在,則直接返回
            if (success) *success = 0;
            return is;
        }
        // 調整整數集合的大小
        is = intsetResize(is,intrev32ifbe(is->length)+1);
        // 將整數集合中pos~end的資料移動到pos+1~newend上
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }
    // 新增資料到第pos位
    _intsetSet(is,pos,value);
    // 更新length值
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}
// 查詢value在整數集is中該新增到的位置
// 如果整數集中不存在value值,則返回0,並將插入位置存放在pos變數中
// 反之,返回1,表示value已存在
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    // 判斷資料集是否為空
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        // 如果待查詢的數超出了整數集中現有元素的最大和最小範圍,則不需要查詢
        if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
            // value大於整數集中的最大值,則插入到整數集末尾
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            // value小於整數集中的最小值,則插入到整數集頭部
            if (pos) *pos = 0;
            return 0;
        }
    }
    // 利用二分法進行查詢,時間複雜度為O(logn)
    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
        }
    }

    if (value == cur) {
        if (pos) *pos = mid;
        return 1;
    } else {
        if (pos) *pos = min;
        return 0;
    }
}
// 將整數集的from為開始的資料全部移動到to位以後
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
    void *src, *dst;
    uint32_t bytes = intrev32ifbe(is->length)-from;
    uint32_t encoding = intrev32ifbe(is->encoding);
    // 根據編碼格式做相應處理
    // src為待移動記憶體的初始位置
    // dst為需要移動到的記憶體塊的初始位置
    // bytes為需要移動的位元組數
    if (encoding == INTSET_ENC_INT64) {
        src = (int64_t*)is->contents+from;
        dst = (int64_t*)is->contents+to;
        bytes *= sizeof(int64_t);
    } else if (encoding == INTSET_ENC_INT32) {
        src = (int32_t*)is->contents+from;
        dst = (int32_t*)is->contents+to;
        bytes *= sizeof(int32_t);
    } else {
        src = (int16_t*)is->contents+from;
        dst = (int16_t*)is->contents+to;
        bytes *= sizeof(int16_t);
    }
    memmove(dst,src,bytes);
}

移除資料

// 將整數集合中值為value的整數移除
intset *intsetRemove(intset *is, int64_t value, int *success) {
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 0;
    // 符合刪除條件的要求有兩條:
    // -- 值不能超出當前編碼格式能表示的範圍
    // -- 整數集中能找到該值
    if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) {
        uint32_t len = intrev32ifbe(is->length);

        // 表示能刪除
        if (success) *success = 1;

        // 移動記憶體,刪除資料
        if (pos < (len-1)) intsetMoveTail(is,pos+1,pos);
        // 調整記憶體大小
        is = intsetResize(is,len-1);
        // 更新length值
        is->length = intrev32ifbe(len-1);
    }
    return is;
}

其他操作函式

  • intsetFind 用二分法判斷給定值是否存在於集合中
  • intsetRandom 隨機返回整數集合中的一個數
  • intsetGet 取出底層屬豬在給定索引上的元素
  • intsetLen 返回整數集合中的元素個數
  • intsetloblen 返回整數集合佔用的記憶體位元組數

intset小結

整數集合intset的底層實現為陣列,該陣列中的元素有序、無重複的存放,為了更好的節省記憶體,intset提供了升級操作,但是不支援降級操作。intset的原始碼實現比較簡單,但功能上很實用。