redis原始碼分析與思考(十七)——有序集合型別的命令實現(t_zset.c)
阿新 • • 發佈:2018-11-12
有序集合是集合的延伸,它儲存著集合元素的不可重複性,但不同的是,它是有序的,它利用每一個元素的分數來作為有序集合的排序依據,現在列出有序集合的命令:
有序集合命令
命令 | 對應操作 | 時間複雜度 |
---|---|---|
zadd key score member [score member…] | 新增成員 | O(n) |
zcard key | 計算成員個數 | O(1) |
zscore key member | 計算成員的分數 | O(1) |
zrank key member | 計算成員的排名 | O(logn) |
zrem key member [member…] | 刪除成員 | O(logn) |
zincrby key increment member | 增加成員的分數 | O(logn) |
zrange key start end [withscores] | 返回指定排名的成員 | O(logn) |
zrangebyscore key min max [withscores] | 返回指定分數範圍內的成員 | O(logn) |
zcount key min max | 返回指定分數範圍內的成員個數 | O(logn) |
zremrangebyrank key start end | 刪除第start到第end名的成員 | O(logn) |
zremrangebyscore key min max | 刪除指定分數範圍內的成員 | O(logn) |
zinterstore destination numkeys key [key…] | 計算交集儲存在目標鍵 | O(logn) |
zunionstore destination numkeys key [key…] | 就算並集儲存在目標鍵 | O(logn) |
與集合型別一樣,有序集合也只講交集與並集以及編碼的轉換,有序集合的其它操作可參照之前寫的壓縮列表與跳躍表的實現。
編碼的轉換
當有序集合裡面的元素個數小於預設的128個,且每個元素的值都小於64位元組的時候,redis會採用壓縮列表來實現存貯,不滿足上述任意情況都將實行編碼轉換,因為那時壓縮列表的效率會變低。但是與集合不同的是,編碼轉換會迴轉,即有序集合滿足上述情況下,跳躍表編碼的有序集合會回到壓縮列表編碼的有序集合:
/*
* 將跳躍表物件 zobj 的底層編碼轉換為 encoding 。
*/
void zsetConvert(robj *zobj, int encoding) {
zset *zs;
zskiplistNode *node, *next;
robj *ele;
double score;
if (zobj->encoding == encoding) return;
// 從 ZIPLIST 編碼轉換為 SKIPLIST 編碼
if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (encoding != REDIS_ENCODING_SKIPLIST)
redisPanic("Unknown target encoding");
// 建立有序集合結構
zs = zmalloc(sizeof(*zs));
// 字典
zs->dict = dictCreate(&zsetDictType,NULL);
// 跳躍表
zs->zsl = zslCreate();
// 有序集合在 ziplist 中的排列:
// 指向 ziplist 中的首個節點(儲存著元素成員)
eptr = ziplistIndex(zl,0);
redisAssertWithInfo(NULL,zobj,eptr != NULL);
// 指向 ziplist 中的第二個節點(儲存著元素分值)
sptr = ziplistNext(zl,eptr);
redisAssertWithInfo(NULL,zobj,sptr != NULL);
// 遍歷所有 ziplist 節點,並將元素的成員和分值新增到有序集合中
while (eptr != NULL) {
// 取出分值
score = zzlGetScore(sptr);
// 取出成員
redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
if (vstr == NULL)
ele = createStringObjectFromLongLong(vlong);
else
ele = createStringObject((char*)vstr,vlen);
// 將成員和分值分別關聯到跳躍表和字典中
node = zslInsert(zs->zsl,score,ele);
redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
incrRefCount(ele); /* Added to dictionary. */
// 移動指標,指向下個元素
zzlNext(zl,&eptr,&sptr);
}
// 釋放原來的 ziplist
zfree(zobj->ptr);
// 更新物件的值,以及編碼方式
zobj->ptr = zs;
zobj->encoding = REDIS_ENCODING_SKIPLIST;
// 從 SKIPLIST 轉換為 ZIPLIST 編碼
} else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
// 新的 ziplist
unsigned char *zl = ziplistNew();
if (encoding != REDIS_ENCODING_ZIPLIST)
redisPanic("Unknown target encoding");
// 指向跳躍表
zs = zobj->ptr;
// 先釋放字典,因為只需要跳躍表就可以遍歷整個有序集合了
dictRelease(zs->dict);
// 指向跳躍表首個節點
node = zs->zsl->header->level[0].forward;
// 釋放跳躍表表頭
zfree(zs->zsl->header);
zfree(zs->zsl);
// 遍歷跳躍表,取出裡面的元素,並將它們新增到 ziplist
while (node) {
// 取出解碼後的值物件
ele = getDecodedObject(node->obj);
// 新增元素到 ziplist
zl = zzlInsertAt(zl,NULL,ele,node->score);
decrRefCount(ele);
// 沿著跳躍表的第 0 層前進
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
// 釋放跳躍表
zfree(zs);
// 更新物件的值,以及物件的編碼方式
zobj->ptr = zl;
zobj->encoding = REDIS_ENCODING_ZIPLIST;
} else {
redisPanic("Unknown sorted set encoding");
}
}
交集、並集
有序集合的交集與並集的底層實現也在一個函式裡面,與集合的交併集計算原理差不多,多了權值的計算,並集的時候沒有演算法的選擇:
/**
*
* @param c 客戶端
* @param dstkey 儲存的目標有序集合鍵
* @param op 判斷交集還是並集
*/
void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
int i, j;
long setnum;
int aggregate = REDIS_AGGR_SUM;
zsetopsrc *src;
zsetopval zval;
robj *tmp;
unsigned int maxelelen = 0;
robj *dstobj;
zset *dstzset;
zskiplistNode *znode;
int touched = 0;
/* expect setnum input keys to be given */
// 取出要處理的有序集合的個數 setnum
if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
return;
if (setnum < 1) {
//向客戶端返回錯誤資訊
addReplyError(c,
"at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
return;
}
/* test if the expected number of keys would overflow */
// setnum 引數和傳入的 key 數量不相同,出錯
if (setnum > c->argc-3) {
addReply(c,shared.syntaxerr);
return;
}
/* read keys to be used for input */
// 為每個輸入 key 建立一個迭代器
src = zcalloc(sizeof(zsetopsrc) * setnum);
for (i = 0, j = 3; i < setnum; i++, j++) {
// 取出 key 物件
robj *obj = lookupKeyWrite(c->db,c->argv[j]);
// 建立迭代器
if (obj != NULL) {
if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
zfree(src);
addReply(c,shared.wrongtypeerr);
return;
}
src[i].subject = obj;
src[i].type = obj->type;
src[i].encoding = obj->encoding;
// 不存在的物件設為 NULL
} else {
src[i].subject = NULL;
}
/* Default all weights to 1. */
// 預設權重為 1.0
src[i].weight = 1.0;
}
/* parse optional extra arguments */
// 分析並讀入可選引數
if (j < c->argc) {
int remaining = c->argc - j;
while (remaining) {
if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
j++; remaining--;
// 權重引數
for (i = 0; i < setnum; i++, j++, remaining--) {
if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
"weight value is not a float") != REDIS_OK)
{
zfree(src);
return;
}
}
} else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
j++; remaining--;
// 判斷聚合方式
if (!strcasecmp(c->argv[j]->ptr,"sum")) {
aggregate = REDIS_AGGR_SUM;
} else if (!strcasecmp(c->argv[j]->ptr,"min")) {
aggregate = REDIS_AGGR_MIN;
} else if (!strcasecmp(c->argv[j]->ptr,"max")) {
aggregate = REDIS_AGGR_MAX;
} else {
zfree(src);
addReply(c,shared.syntaxerr);
return;
}
j++; remaining--;
} else {
zfree(src);
addReply(c,shared.syntaxerr);
return;
}
}
}
/* sort sets from the smallest to largest, this will improve our
* algorithm's performance */
// 對所有集合進行排序,以減少演算法的常數項
qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
// 建立結果集物件
dstobj = createZsetObject();
dstzset = dstobj->ptr;
memset(&zval, 0, sizeof(zval));
// ZINTERSTORE 命令
if (op == REDIS_OP_INTER) {
/* Skip everything if the smallest input is empty. */
// 只處理非空集合
if (zuiLength(&src[0]) > 0) {
/* Precondition: as src[0] is non-empty and the inputs are ordered
* by size, all src[i > 0] are non-empty too. */
// 遍歷基數最小的 src[0] 集合
zuiInitIterator(&src[0]);
while (zuiNext(&src[0],&zval)) {
double score, value;
// 計算加權分值
score = src[0].weight * zval.score;
if (isnan(score)) score = 0;
// 將 src[0] 集合中的元素和其他集合中的元素做加權聚合計算
for (j = 1; j < setnum; j++) {
/* It is not safe to access the zset we are
* iterating, so explicitly check for equal object. */
// 如果當前迭代到的 src[j] 的物件和 src[0] 的物件一樣,
// 那麼 src[0] 出現的元素必然也出現在 src[j]
// 那麼我們可以直接計算聚合值,
// 不必進行 zuiFind 去確保元素是否出現
// 這種情況在某個 key 輸入了兩次,
// 並且這個 key 是所有輸入集合中基數最小的集合時會出現
if (src[j].subject == src[0].subject) {
value = zval.score*src[j].weight;
zunionInterAggregate(&score,value,aggregate);
// 如果能在其他集合找到當前迭代到的元素的話
// 那麼進行聚合計算
} else if (zuiFind(&src[j],&zval,&value)) {
value *= src[j].weight;
zunionInterAggregate(&score,value,aggregate);
// 如果當前元素沒出現在某個集合,那麼跳出 for 迴圈
// 處理下個元素
} else {
break;
}
}
/* Only continue when present in every input. */
// 只在交集元素出現時,才執行以下程式碼
if (j == setnum) {
// 取出值物件
tmp = zuiObjectFromValue(&zval);
// 加入到有序集合中
znode = zslInsert(dstzset->zsl,score,tmp);
incrRefCount(tmp); /* added to skiplist */
// 加入到字典中
dictAdd(dstzset->dict,tmp,&znode->score);
incrRefCount(tmp); /* added to dictionary */
// 更新字串物件的最大長度
if (sdsEncodedObject(tmp)) {
if (sdslen(tmp->ptr) > maxelelen)
maxelelen = sdslen(tmp->ptr);
}
}
}
zuiClearIterator(&src[0]);
}
// ZUNIONSTORE
} else if (op == REDIS_OP_UNION) {
// 遍歷所有輸入集合
for (i = 0; i < setnum; i++) {
// 跳過空集合
if (zuiLength(&src[i]) == 0)
continue;
// 遍歷所有集合元素
zuiInitIterator(&src[i]);
while (zuiNext(&src[i],&zval)) {
double score, value;
/* Skip an element that when already processed */
// 跳過已處理元素
if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
continue;
/* Initialize score */
// 初始化分值
score = src[i].weight * zval.score;
// 溢位時設為 0
if (isnan(score)) score = 0;
/* We need to check only next sets to see if this element
* exists, since we process every element just one time so
* it can't exist in a previous set (otherwise it would be
* already processed). */
for (j = (i+1); j < setnum; j++) {
/* It is not safe to access the zset we are
* iterating, so explicitly check for equal object. */
// 當前元素的集合和被迭代集合一樣
// 所以同一個元素必然出現在 src[j] 和 src[i]
// 程式直接計算它們的聚合值
// 而不必使用 zuiFind 來檢查元素是否存在
if(src[j].subject == src[i].subject) {
value = zval.score*src[j].weight;
zunionInterAggregate(&score,value,aggregate);
// 檢查成員是否存在
} else if (zuiFind(&src[j],&zval,&value)) {
value *= src[j].weight;
zunionInterAggregate(&score,value,aggregate);
}
}
// 取出成員
tmp = zuiObjectFromValue(&zval);
// 插入並集元素到跳躍表
znode = zslInsert(dstzset->zsl,score,tmp);
incrRefCount(zval.ele); /* added to skiplist */
// 新增元素到字典
dictAdd(dstzset->dict,tmp,&znode->score);
incrRefCount(zval.ele); /* added to dictionary */
// 更新字串最大長度
if (sdsEncodedObject(tmp)) {
if (sdslen(tmp->ptr) > maxelelen)
maxelelen = sdslen(tmp->ptr);
}
}
zuiClearIterator(&src[i]);
}
} else {
redisPanic("Unknown operator");
}
// 刪除已存在的 dstkey ,等待後面用新物件代替它
if (dbDelete(c->db,dstkey)) {
signalModifiedKey(c->db,dstkey);
touched = 1;
server.dirty++;
}
// 如果結果集合的長度不為 0
if (dstzset->zsl->length) {
/* Convert to ziplist when in limits. */
// 看是否需要對結果集合進行編碼轉換
if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&