Redis(2)——跳躍表
一、跳躍表簡介
跳躍表(skiplist)是一種隨機化的資料結構,由 William Pugh 在論文《Skip lists: a probabilistic alternative to balanced trees》中提出,是一種可以於平衡樹媲美的層次化連結串列結構——查詢、刪除、新增等操作都可以在對數期望時間下完成,以下是一個典型的跳躍表例子:
我們在上一篇中提到了 Redis 的五種基本結構中,有一個叫做 有序列表 zset 的資料結構,它類似於 Java 中的 SortedSet 和 HashMap 的結合體,一方面它是一個 set 保證了內部 value 的唯一性,另一方面又可以給每個 value 賦予一個排序的權重值 score,來達到 排序 的目的。
它的內部實現就依賴了一種叫做 「跳躍列表」 的資料結構。
為什麼使用跳躍表
首先,因為 zset 要支援隨機的插入和刪除,所以它 不宜使用陣列來實現,關於排序問題,我們也很容易就想到 紅黑樹/ 平衡樹 這樣的樹形結構,為什麼 Redis 不使用這樣一些結構呢?
- 效能考慮: 在高併發的情況下,樹形結構需要執行一些類似於 rebalance 這樣的可能涉及整棵樹的操作,相對來說跳躍表的變化只涉及區域性 (下面詳細說);
- 實現考慮: 在複雜度與紅黑樹相同的情況下,跳躍表實現起來更簡單,看起來也更加直觀;
基於以上的一些考慮,Redis 基於 William Pugh 的論文做出一些改進後採用了 跳躍表 這樣的結構。
本質是解決查詢問題
我們先來看一個普通的連結串列結構:
我們需要這個連結串列按照 score 值進行排序,這也就意味著,當我們需要新增新的元素時,我們需要定位到插入點,這樣才可以繼續保證連結串列是有序的,通常我們會使用 二分查詢法,但二分查詢是有序陣列的,連結串列沒辦法進行位置定位,我們除了遍歷整個找到第一個比給定資料大的節點為止 (時間複雜度 O(n)) 似乎沒有更好的辦法。
但假如我們每相鄰兩個節點之間就增加一個指標,讓指標指向下一個節點,如下圖:
這樣所有新增的指標連成了一個新的連結串列,但它包含的資料卻只有原來的一半 (圖中的為 3,11)。
現在假設我們想要查詢資料時,可以根據這條新的連結串列查詢,如果碰到比待查詢資料大的節點時,再回到原來的連結串列中進行查詢,比如,我們想要查詢 7,查詢的路徑則是沿著下圖中標註出的紅色指標所指向的方向進行的:
這是一個略微極端的例子,但我們仍然可以看到,通過新增加的指標查詢,我們不再需要與連結串列上的每一個節點逐一進行比較,這樣改進之後需要比較的節點數大概只有原來的一半。
利用同樣的方式,我們可以在新產生的連結串列上,繼續為每兩個相鄰的節點增加一個指標,從而產生第三層連結串列:
在這個新的三層連結串列結構中,我們試著 查詢 13,那麼沿著最上層連結串列首先比較的是 11,發現 11 比 13 小,於是我們就知道只需要到 11 後面繼續查詢,從而一下子跳過了 11 前面的所有節點。
可以想象,當連結串列足夠長,這樣的多層連結串列結構可以幫助我們跳過很多下層節點,從而加快查詢的效率。
更進一步的跳躍表
跳躍表 skiplist 就是受到這種多層連結串列結構的啟發而設計出來的。按照上面生成連結串列的方式,上面每一層連結串列的節點個數,是下面一層的節點個數的一半,這樣查詢過程就非常類似於一個二分查詢,使得查詢的時間複雜度可以降低到 O(logn)。
但是,這種方法在插入資料的時候有很大的問題。新插入一個節點之後,就會打亂上下相鄰兩層連結串列上節點個數嚴格的 2:1 的對應關係。如果要維持這種對應關係,就必須把新插入的節點後面的所有節點 (也包括新插入的節點) 重新進行調整,這會讓時間複雜度重新蛻化成 O(n)。刪除資料也有同樣的問題。
skiplist 為了避免這一問題,它不要求上下相鄰兩層連結串列之間的節點個數有嚴格的對應關係,而是 為每個節點隨機出一個層數(level)。比如,一個節點隨機出的層數是 3,那麼就把它鏈入到第 1 層到第 3 層這三層連結串列中。為了表達清楚,下圖展示瞭如何通過一步步的插入操作從而形成一個 skiplist 的過程:
從上面的建立和插入的過程中可以看出,每一個節點的層數(level)是隨機出來的,而且新插入一個節點並不會影響到其他節點的層數,因此,插入操作只需要修改節點前後的指標,而不需要對多個節點都進行調整,這就降低了插入操作的複雜度。
現在我們假設從我們剛才建立的這個結構中查詢 23 這個不存在的數,那麼查詢路徑會如下圖:
二、跳躍表的實現
Redis 中的跳躍表由 server.h/zskiplistNode
和 server.h/zskiplist
兩個結構定義,前者為跳躍表節點,後者則儲存了跳躍節點的相關資訊,同之前的 集合 list
結構類似,其實只有 zskiplistNode
就可以實現了,但是引入後者是為了更加方便的操作:
/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
// value
sds ele;
// 分值
double score;
// 後退指標
struct zskiplistNode *backward;
// 層
struct zskiplistLevel {
// 前進指標
struct zskiplistNode *forward;
// 跨度
unsigned long span;
} level[];
} zskiplistNode;
typedef struct zskiplist {
// 跳躍表頭指標
struct zskiplistNode *header, *tail;
// 表中節點的數量
unsigned long length;
// 表中層數最大的節點的層數
int level;
} zskiplist;
正如文章開頭畫出來的那張標準的跳躍表那樣。
隨機層數
對於每一個新插入的節點,都需要呼叫一個隨機演算法給它分配一個合理的層數,原始碼在 t_zset.c/zslRandomLevel(void)
中被定義:
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
直觀上期望的目標是 50% 的概率被分配到 Level 1
,25% 的概率被分配到 Level 2
,12.5% 的概率被分配到 Level 3
,以此類推...有 2-63 的概率被分配到最頂層,因為這裡每一層的晉升率都是 50%。
Redis 跳躍表預設允許最大的層數是 32,被原始碼中 ZSKIPLIST_MAXLEVEL
定義,當 Level[0]
有 264 個元素時,才能達到 32 層,所以定義 32 完全夠用了。
建立跳躍表
這個過程比較簡單,在原始碼中的 t_zset.c/zslCreate
中被定義:
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
// 申請記憶體空間
zsl = zmalloc(sizeof(*zsl));
// 初始化層數為 1
zsl->level = 1;
// 初始化長度為 0
zsl->length = 0;
// 建立一個層數為 32,分數為 0,沒有 value 值的跳躍表頭節點
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
// 跳躍表頭節點初始化
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
// 將跳躍表頭節點的所有前進指標 forward 設定為 NULL
zsl->header->level[j].forward = NULL;
// 將跳躍表頭節點的所有跨度 span 設定為 0
zsl->header->level[j].span = 0;
}
// 跳躍表頭節點的後退指標 backward 置為 NULL
zsl->header->backward = NULL;
// 表頭指向跳躍表尾節點的指標置為 NULL
zsl->tail = NULL;
return zsl;
}
即執行完之後建立瞭如下結構的初始化跳躍表:
插入節點實現
這幾乎是最重要的一段程式碼了,但總體思路也比較清晰簡單,如果理解了上面所說的跳躍表的原理,那麼很容易理清楚插入節點時發生的幾個動作 (幾乎跟連結串列類似):
- 找到當前我需要插入的位置 (其中包括相同 score 時的處理);
- 建立新節點,調整前後的指標指向,完成插入;
為了方便閱讀,我把原始碼 t_zset.c/zslInsert
定義的插入函式拆成了幾個部分
第一部分:宣告需要儲存的變數
// 儲存搜尋路徑
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// 儲存經過的節點跨度
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;
第二部分:搜尋當前節點插入位置
serverAssert(!isnan(score));
x = zsl->header;
// 逐步降級尋找目標節點,得到 "搜尋路徑"
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
// 如果 score 相等,還需要比較 value 值
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
// 記錄 "搜尋路徑"
update[i] = x;
}
討論: 有一種極端的情況,就是跳躍表中的所有 score 值都是一樣,zset 的查詢效能會不會退化為 O(n) 呢?
從上面的原始碼中我們可以發現 zset 的排序元素不只是看 score 值,也會比較 value 值 (字串比較)
第三部分:生成插入節點
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not. */
level = zslRandomLevel();
// 如果隨機生成的 level 超過了當前最大 level 需要更新跳躍表的資訊
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
// 建立新節點
x = zslCreateNode(level,score,ele);
第四部分:重排前向指標
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
第五部分:重排後向指標並返回
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
節點刪除實現
刪除過程由原始碼中的 t_zset.c/zslDeleteNode
定義,和插入過程類似,都需要先把這個 "搜尋路徑" 找出來,然後對於每個層的相關節點重排一下前向後向指標,同時還要注意更新一下最高層數 maxLevel
,直接放原始碼 (如果理解了插入這裡還是很容易理解的):
/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {
update[i]->level[i].span += x->level[i].span - 1;
update[i]->level[i].forward = x->level[i].forward;
} else {
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
zsl->level--;
zsl->length--;
}
/* Delete an element with matching score/element from the skiplist.
* The function returns 1 if the node was found and deleted, otherwise
* 0 is returned.
*
* If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
* it is not freed (but just unlinked) and *node is set to the node pointer,
* so that it is possible for the caller to reuse the node (including the
* referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;
if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
zslDeleteNode(zsl, x, update);
if (!node)
zslFreeNode(x);
else
*node = x;
return 1;
}
return 0; /* not found */
}
節點更新實現
當我們呼叫 ZADD
方法時,如果對應的 value 不存在,那就是插入過程,如果這個 value 已經存在,只是調整一下 score 的值,那就需要走一個更新流程。
假設這個新的 score 值並不會帶來排序上的變化,那麼就不需要調整位置,直接修改元素的 score 值就可以了,但是如果排序位置改變了,那就需要調整位置,該如何調整呢?
從原始碼 t_zset.c/zsetAdd
函式 1350
行左右可以看到,Redis 採用了一個非常簡單的策略:
/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*flags |= ZADD_UPDATED;
}
把這個元素刪除再插入這個,需要經過兩次路徑搜尋,從這一點上來看,Redis 的 ZADD
程式碼似乎還有進一步優化的空間。
元素排名的實現
跳躍表本身是有序的,Redis 在 skiplist 的 forward 指標上進行了優化,給每一個 forward 指標都增加了 span
屬性,用來 表示從前一個節點沿著當前層的 forward 指標跳到當前這個節點中間會跳過多少個節點。在上面的原始碼中我們也可以看到 Redis 在插入、刪除操作時都會小心翼翼地更新 span
值的大小。
所以,沿著 "搜尋路徑",把所有經過節點的跨度 span
值進行累加就可以算出當前元素的最終 rank 值了:
/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
// span 累加
rank += x->level[i].span;
x = x->level[i].forward;
}
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
return 0;
}
擴充套件閱讀
- 跳躍表 Skip List 的原理和實現(Java) - https://blog.csdn.net/DERRANTCM/article/details/79063312
- 【演算法導論33】跳躍表(Skip list)原理與java實現 - https://blog.csdn.net/brillianteagle/article/details/52206261
參考資料
- 《Redis 設計與實現》 - http://redisbook.com/
- 【官方文件】Redis 資料型別介紹 - http://www.redis.cn/topics/data-types-intro.html
- 《Redis 深度歷險》 - https://book.douban.com/subject/30386804/
- Redis 原始碼 - https://github.com/antirez/redis
- Redis 快速入門 - 易百教程 - https://www.yiibai.com/redis/redis_quick_guide.html
- Redis【入門】就這一篇! - https://www.wmyskxz.com/2018/05/31/redis-ru-men-jiu-zhe-yi-pian/
- Redis為什麼用跳錶而不用平衡樹? - https://mp.weixin.qq.com/s?__biz=MzA4NTg1MjM0Mg==&mid=2657261425&idx=1&sn=d840079ea35875a8c8e02d9b3e44cf95&scene=21#wechat_redirect
- 為啥 redis 使用跳錶(skiplist)而不是使用 red-black? - 知乎@於康 - https://www.zhihu.com/question/20202931
- 本文已收錄至我的 Github 程式設計師成長系列 【More Than Java】,學習,不止 Code,歡迎 star:https://github.com/wmyskxz/MoreThanJava
- 個人公眾號 :wmyskxz,個人獨立域名部落格:wmyskxz.com,堅持原創輸出,下方掃碼關注,2020,與您共同成長!
非常感謝各位人才能 看到這裡,如果覺得本篇文章寫得不錯,覺得 「我沒有三顆心臟」有點東西 的話,求點贊,求關注,求分享,求留言!
創作不易,各位的支援和認可,就是我創作的最大動力,我們下篇文章見