Redis底層詳解(四) 整數集合
一、集合概述
對於集合,STL 的 set 相信大家都不陌生,它的底層實現是紅黑樹。無論插入、刪除、查詢都是 O(log n) 的時間複雜度。當然,如果用雜湊表來實現集合,插入、刪除、查詢都可以達到 O(1)。那麼為什麼集合要用紅黑樹和沒有用雜湊表呢?我想,最大的可能是基於集合自身的特性,集合有它特有的操作:求交、求並、求差。這三個操作對於雜湊表來說都是 O(n) 的。基於這一點,相比無序的雜湊表來說,採用有序的紅黑樹會更加合適。
二、Redis 整數集合(intset)
今天要講的整數集合,又稱為 intset,是 Redis 特有的資料結構。它的實現既不是紅黑樹,也不是雜湊表。就是簡單的陣列加上記憶體編碼。當儲存元素較少( 元素個數上限定義在server.h 的 OBJ_SET_MAX_INTSET_ENTRIES 巨集定義值為512)且均為整型時,才會使用到整數集合。它的查詢是 O(log n) 的,插入和刪除都是 O(n) 的。但是由於儲存元素相對較少的時候,O(log n) 和 O(n) 差距不是很大,但是用 Redis 的這種整數集合,相比紅黑樹和雜湊表來說,可以大大減少記憶體。
所以,Redis 的 整數集合 intset 的存在主要還是為了節省記憶體。
1、intset 結構定義
intset 結構定義在 intset.h 中:
#define INTSET_ENC_INT16 (sizeof(int16_t)) #define INTSET_ENC_INT32 (sizeof(int32_t)) #define INTSET_ENC_INT64 (sizeof(int64_t)) typedef struct intset { uint32_t encoding; /* a */ uint32_t length; /* b */ int8_t contents[]; /* c */ } intset;
a) encoding 指定了編碼方式,總共有 INTSET_ENC_INT16、INTSET_ENC_INT32、INTSET_ENC_INT64 三種。從巨集定義可以看出,這三個值分別為 2、4、8。從字面意思可以看出三者能表示的範圍是 16位整數、32位整數 以及 64位整數。
b) length 儲存了整數集合的元素個數。
c) contents 為整數集合的柔性陣列,元素型別並不一定是 int8_t 型別的。 contents 不佔用結構體的大小,它只作為整數集合資料的首指標。整數集合中的元素按照從小到大的順序在 contents 中排列起來。
2、編碼方式
首先,我們來理解編碼方式 encoding 的含義。需要明確的一點是,對於一個整數集合來說,所有的元素的編碼一定是一致的(否則每個數都得存一個編碼,而不是將它存在 intset 結構體內了),那麼整個整數集合的編碼取決於集合中“絕對值”最大的那個數(之所以是絕對值,因為整數包含正數和負數)。
通過那個絕對值最大的整數來獲取編碼,實現如下:
static uint8_t _intsetValueEncoding(int64_t v) {
if (v < INT32_MIN || v > INT32_MAX)
return INTSET_ENC_INT64;
else if (v < INT16_MIN || v > INT16_MAX)
return INTSET_ENC_INT32;
else
return INTSET_ENC_INT16;
}
這段程式碼的含義是,如果整數 v 不能用 32位整數表示,那麼就需要用 INTSET_ENC_INT64 編碼;如果不能用 16位整數表示,那麼就需要用 INTSET_ENC_INT32 編碼;否則,採用 INTSET_ENC_INT16 編碼就行。核心就是:能用2個位元組表示就不用4個位元組,能用4個位元組表示就不用8個位元組,能省則省。
幾個巨集定義在 stdint.h 中,如下:
/* Minimum of signed integral types. */
# define INT16_MIN (-32767-1)
# define INT32_MIN (-2147483647-1)
/* Maximum of signed integral types. */
# define INT16_MAX (32767)
# define INT32_MAX (2147483647)
3、編碼升級
當前編碼方式不足以儲存更大位數的整數時,需要升級編碼。舉個例子,下圖所示的四個數字都在 [ -32768, 32767 ] 範圍內,所以採用 INTSET_ENC_INT16 編碼即可。contents 的陣列長度為 sizeof(int16_t) * 4 = 2 * 4 = 8 個位元組 ( 即64個二進位制位 )。
然後我們插入一個數,它的值為 32768,比 INT16_MAX 大1,所以它需要採用 INTSET_ENC_INT32 編碼,而整數集合中所有的數的編碼需要保持一致。那麼,所有數的編碼都需要轉為 INTSET_ENC_INT32 編碼。這就是 “升級”。如圖所示:
升級完後,contents 陣列的長度變為 sizeof(int32_t) * 5 = 4 * 5 = 20 個位元組 ( 即160個二進位制位 )。而且每個元素佔用的記憶體都擴大一倍,所在的相對位置也發生了變化,導致所有的元素都需要往高位記憶體遷移。
那我們一開始就把所有的整數集合都用 INTSET_ENC_INT64 來編碼不就好了,還省得麻煩。原因是 Redis 設計 intset 的初衷還是為了節省記憶體,設想一個集合的元素永遠都不會超過 16位 整數,那麼用 64位整數的話,相當於浪費了 3倍 的記憶體。
三、整數集合常用操作
1、建立集合
建立一個整數集合 intsetNew,實現在 intset.c 中:
intset *intsetNew(void) {
intset *is = zmalloc(sizeof(intset));
is->encoding = intrev32ifbe(INTSET_ENC_INT16);
is->length = 0;
return is;
}
初始建立的整數集合為空集合,用 zmalloc 進行記憶體分配後,定義編碼為 INTSET_ENC_INT16,這樣可以使記憶體儘量小。這裡需要注意的是,intset 的儲存直接涉及到記憶體編碼,所以需要考慮主機的位元組序問題(相關資料請參閱:位元組序)。
intrev32ifbe 的意思是 int32 reversal if big endian。即 如果當前主機位元組序為大端序,那麼將它的記憶體儲存進行翻轉操作。簡言之,intset 的所有成員儲存方式都採用小端序。所以建立一個空的整數集合,記憶體分佈如下:
瞭解了整數集合的記憶體編碼以後,我們來看看它的 設定 (set)和 獲取(get)。
2、元素設定
設定 的含義就是給定整數集合以及一個位置和值,將值設定到這個整數集合的對應位置上。_intsetSet 實現如下:
static void _intsetSet(intset *is, int pos, int64_t value) {
uint32_t encoding = intrev32ifbe(is->encoding); /* a */
if (encoding == INTSET_ENC_INT64) {
((int64_t*)is->contents)[pos] = value; /* b */
memrev64ifbe(((int64_t*)is->contents)+pos); /* c */
} else if (encoding == INTSET_ENC_INT32) {
((int32_t*)is->contents)[pos] = value;
memrev32ifbe(((int32_t*)is->contents)+pos);
} else {
((int16_t*)is->contents)[pos] = value;
memrev16ifbe(((int16_t*)is->contents)+pos);
}
}
a) 大端序和小端序只是儲存方式,encoding 在儲存的時候進行了一次 intrev32ifbe 轉換,取出來用的時候需要再進行一次 intrev32ifbe 轉換(其實就是序列化和反序列化)。
b) 根據 encoding 的型別,將 contents 轉換成指定型別的指標,然後用 pos 進行索引找到對應的記憶體位置,然後將 value 的值設定到對應的記憶體中。
c) memrev64ifbe 的實現參見 位元組序 的 memrev64 函式,即將對應記憶體的值轉換成小端序儲存。
3、元素獲取
獲取 的含義就是給定整數集合以及一個位置,返回給定位置的元素的值。_intsetGet 實現如下:
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
int64_t v64;
int32_t v32;
int16_t v16;
if (enc == INTSET_ENC_INT64) {
memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64)); /* a */
memrev64ifbe(&v64); /* b */
return v64;
} else if (enc == INTSET_ENC_INT32) {
memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
memrev32ifbe(&v32);
return v32;
} else {
memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
memrev16ifbe(&v16);
return v16;
}
}
static int64_t _intsetGet(intset *is, int pos) {
return _intsetGetEncoded(is,pos,intrev32ifbe(is->encoding));
}
a) 根據 encoding 的型別,將 contents 轉換成指定型別的指標,然後用 pos 進行索引找到對應的記憶體位置,將記憶體位置上的值拷貝到臨時變數中;
b) 由於是直接的記憶體拷貝,所以取出來的值還是小端序的,那麼在大端序的主機上得到的值是不對的,所以需要再做一次 memrev64ifbe 轉換將值還原。
4、元素查詢
由於整數集合是有序集合,所以查詢某個元素是否在整數集合中,Redis 採用的是二分查詢。intsetSearch 實現如下:
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0; /* a */
return 0;
} else { /* b */
if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
return 0;
}
}
while(max >= min) {
mid = ((unsigned int)min + (unsigned int)max) >> 1; /* c */
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
}
}
if (value == cur) { /* d */
if (pos) *pos = mid;
return 1;
} else {
if (pos) *pos = min;
return 0;
}
}
a) 整數集合為空,返回0表示查詢失敗;
b) value 的值比整數集合中的最大值還大,或者比最小值還小,則返回0表示查詢失敗;
c) 執行二分查詢,將找到的值存在 cur 中;
d) 如果找到則返回1,表示查詢成功,並且將 pos 設定為 mid 並返回;如果沒找到則返回一個需要插入的位置。
5、記憶體重分配
由於 contents 的記憶體是動態分配的,所以每次進行元素插入或者刪除的時候,都需要重新分配記憶體,這個實現放在 intsetResize 中,實現如下:
static intset *intsetResize(intset *is, uint32_t len) {
uint32_t size = len*intrev32ifbe(is->encoding);
is = zrealloc(is,sizeof(intset)+size);
return is;
}
encoding 本身表示位元組個數,所以乘上集合個數 len 就是 contents 陣列需要的總位元組數了,呼叫 zrealloc 進行記憶體重分配,然後返回重新分配後的地址。
注意:zrealloc 的返回值必須返回出去,因為 intset 在進行記憶體重分配以後,地址可能就變了。即 is = zrealloc(is, ...) 中,此 is 非彼 is。所以,所有呼叫 intsetResize 的函式都需要連帶的返回新的 intset 指標。
6、編碼升級
編碼升級一定發生在元素插入,並且插入的元素的絕對值比整數集合中的元素都大的時候,所以我們把升級後的元素插入和編碼升級放在一個函式實現,名曰 intsetUpgradeAndAdd,實現如下:
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
uint8_t curenc = intrev32ifbe(is->encoding);
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
int prepend = value < 0 ? 1 : 0; /* a */
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1); /* b */
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc)); /* c */
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value); /* d */
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}
a) curenc 記錄升級前的編碼,newenc 記錄升級後的編碼;
b) 將整數集合 is 的編碼設定成新的編碼後,進行記憶體重分配;
c) 獲取原先記憶體中的資料,設定到新記憶體中(注意:由於兩段記憶體空間是重疊的,而且新記憶體的長度一定大於原先記憶體,所以需要從後往前進行拷貝);
d) 當插入的值 value 為負數的時候,為了保證集合的有序性,需要插入到 contents 的頭部;反之,插入到尾部;當 value 為負數時 prepend 為1,這樣就可以保證在記憶體拷貝的時候將第 0 個位置留空。
如圖展示了一個 (-32768, 0, 1, 32767) 的整數集合在插入數字 32768 後的升級的完整過程:
整數集合升級的時間複雜度是 O(n) 的,但是在整數集合的生命期內,升級最多發生兩次(從 INTSET_ENC_INT16 到 INTSET_ENC_INT32 以及 從 INTSET_ENC_INT32 到 INTSET_ENC_INT64)。
7、記憶體遷移
絕大多數情況都是在執行 插入 、刪除 、查詢 操作。插入 和 刪除 會涉及到連續記憶體的移動。Redis 的內部實現中有一個函式 intsetMoveTail 就是用來實現記憶體移動的。
static void intsetMoveTail(intset *is, uint32_t from, uint32_t to) {
void *src, *dst;
uint32_t bytes = intrev32ifbe(is->length)-from; /* a */
uint32_t encoding = intrev32ifbe(is->encoding);
if (encoding == INTSET_ENC_INT64) {
src = (int64_t*)is->contents+from;
dst = (int64_t*)is->contents+to;
bytes *= sizeof(int64_t); /* b */
} else if (encoding == INTSET_ENC_INT32) {
src = (int32_t*)is->contents+from;
dst = (int32_t*)is->contents+to;
bytes *= sizeof(int32_t);
} else {
src = (int16_t*)is->contents+from;
dst = (int16_t*)is->contents+to;
bytes *= sizeof(int16_t);
}
memmove(dst,src,bytes); /* c */
}
a) 統計從 from 到結尾,有多少個元素;
b) 根據不同的編碼,計算出需要拷貝的記憶體位元組數 bytes,以及拷貝源位置 src,拷貝目標位置 dst;
c) memmove 是 string.h 中的函式:src指向的記憶體區域拷貝 bytes 個位元組到 dst 所指向的記憶體區域,這個函式是支援記憶體重疊的;
8、元素插入
最後,講整數集合的插入和刪除,插入呼叫的是 intsetAdd,在 intset.c 中實現:
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 1;
if (valenc > intrev32ifbe(is->encoding)) { /* a */
return intsetUpgradeAndAdd(is,value);
} else {
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0; /* b */
return is;
}
is = intsetResize(is,intrev32ifbe(is->length)+1); /* c */
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1); /* d */
}
_intsetSet(is,pos,value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1); /* e */
return is;
}
a) 插入的數值 value 的記憶體編碼大於現有集合的編碼,直接呼叫 intsetUpgradeAndAdd 進行編碼升級;
b) 集合元素是不重複的,如果 intsetSearch 能夠找到,則將 success 置為0,表示此次插入失敗;
c) 如果 intsetSearch 找不到,將 intset 進行記憶體重分配,即 長度 加 1。
d) pos 為 intsetSearch 過程中找到的 value 將要插入的位置,我們將 pos 以後的記憶體向後移動1個單位 (這裡的1個單位可能是2個位元組、4個位元組或者8個位元組,取決於當前整數集合的記憶體編碼)。
e) 呼叫 _intsetSet 將 value 的值設定到 pos 的位置上,然後給成員變數 length 加 1。最後返回 intset 指標首地址,因為其間進行了 intsetResize,傳入的 intset 指標和返回的有可能不是同一個了。
9、元素刪除
刪除元素呼叫的是 intsetRemove ,實現如下:
intset *intsetRemove(intset *is, int64_t value, int *success) {
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 0;
if (valenc <= intrev32ifbe(is->encoding) && intsetSearch(is,value,&pos)) { /* a */
uint32_t len = intrev32ifbe(is->length);
if (success) *success = 1;
if (pos < (len-1)) intsetMoveTail(is,pos+1,pos); /* b */
is = intsetResize(is,len-1); /* c */
is->length = intrev32ifbe(len-1);
}
return is;
}
a) 當整數集合中存在 value 這個元素時才能執行刪除操作;
b) 如果能通過 intsetSearch 找到元素,那麼它的位置就在 pos 上,這是通過 intsetMoveTail 將記憶體往前挪;
c) intsetResize 重新分配記憶體,並且將集合長度減1;