1. 程式人生 > >Redis(2)——跳躍表

Redis(2)——跳躍表

一、跳躍表簡介

跳躍表(skiplist)是一種隨機化的資料結構,由 William Pugh 在論文《Skip lists: a probabilistic alternative to balanced trees》中提出,是一種可以於平衡樹媲美的層次化連結串列結構——查詢、刪除、新增等操作都可以在對數期望時間下完成,以下是一個典型的跳躍表例子:

我們在上一篇中提到了 Redis 的五種基本結構中,有一個叫做 有序列表 zset 的資料結構,它類似於 Java 中的 SortedSet 和 HashMap 的結合體,一方面它是一個 set 保證了內部 value 的唯一性,另一方面又可以給每個 value 賦予一個排序的權重值 score,來達到 排序 的目的。

它的內部實現就依賴了一種叫做 「跳躍列表」 的資料結構。

為什麼使用跳躍表

首先,因為 zset 要支援隨機的插入和刪除,所以它 不宜使用陣列來實現,關於排序問題,我們也很容易就想到 紅黑樹/ 平衡樹 這樣的樹形結構,為什麼 Redis 不使用這樣一些結構呢?

  1. 效能考慮: 在高併發的情況下,樹形結構需要執行一些類似於 rebalance 這樣的可能涉及整棵樹的操作,相對來說跳躍表的變化只涉及區域性 (下面詳細說)
  2. 實現考慮: 在複雜度與紅黑樹相同的情況下,跳躍表實現起來更簡單,看起來也更加直觀;

基於以上的一些考慮,Redis 基於 William Pugh 的論文做出一些改進後採用了 跳躍表 這樣的結構。

本質是解決查詢問題

我們先來看一個普通的連結串列結構:

我們需要這個連結串列按照 score 值進行排序,這也就意味著,當我們需要新增新的元素時,我們需要定位到插入點,這樣才可以繼續保證連結串列是有序的,通常我們會使用 二分查詢法,但二分查詢是有序陣列的,連結串列沒辦法進行位置定位,我們除了遍歷整個找到第一個比給定資料大的節點為止 (時間複雜度 O(n)) 似乎沒有更好的辦法。

但假如我們每相鄰兩個節點之間就增加一個指標,讓指標指向下一個節點,如下圖:

這樣所有新增的指標連成了一個新的連結串列,但它包含的資料卻只有原來的一半 (圖中的為 3,11)

現在假設我們想要查詢資料時,可以根據這條新的連結串列查詢,如果碰到比待查詢資料大的節點時,再回到原來的連結串列中進行查詢,比如,我們想要查詢 7,查詢的路徑則是沿著下圖中標註出的紅色指標所指向的方向進行的:

這是一個略微極端的例子,但我們仍然可以看到,通過新增加的指標查詢,我們不再需要與連結串列上的每一個節點逐一進行比較,這樣改進之後需要比較的節點數大概只有原來的一半。

利用同樣的方式,我們可以在新產生的連結串列上,繼續為每兩個相鄰的節點增加一個指標,從而產生第三層連結串列:

在這個新的三層連結串列結構中,我們試著 查詢 13,那麼沿著最上層連結串列首先比較的是 11,發現 11 比 13 小,於是我們就知道只需要到 11 後面繼續查詢,從而一下子跳過了 11 前面的所有節點。

可以想象,當連結串列足夠長,這樣的多層連結串列結構可以幫助我們跳過很多下層節點,從而加快查詢的效率。

更進一步的跳躍表

跳躍表 skiplist 就是受到這種多層連結串列結構的啟發而設計出來的。按照上面生成連結串列的方式,上面每一層連結串列的節點個數,是下面一層的節點個數的一半,這樣查詢過程就非常類似於一個二分查詢,使得查詢的時間複雜度可以降低到 O(logn)

但是,這種方法在插入資料的時候有很大的問題。新插入一個節點之後,就會打亂上下相鄰兩層連結串列上節點個數嚴格的 2:1 的對應關係。如果要維持這種對應關係,就必須把新插入的節點後面的所有節點 (也包括新插入的節點) 重新進行調整,這會讓時間複雜度重新蛻化成 O(n)。刪除資料也有同樣的問題。

skiplist 為了避免這一問題,它不要求上下相鄰兩層連結串列之間的節點個數有嚴格的對應關係,而是 為每個節點隨機出一個層數(level)。比如,一個節點隨機出的層數是 3,那麼就把它鏈入到第 1 層到第 3 層這三層連結串列中。為了表達清楚,下圖展示瞭如何通過一步步的插入操作從而形成一個 skiplist 的過程:

從上面的建立和插入的過程中可以看出,每一個節點的層數(level)是隨機出來的,而且新插入一個節點並不會影響到其他節點的層數,因此,插入操作只需要修改節點前後的指標,而不需要對多個節點都進行調整,這就降低了插入操作的複雜度。

現在我們假設從我們剛才建立的這個結構中查詢 23 這個不存在的數,那麼查詢路徑會如下圖:

二、跳躍表的實現

Redis 中的跳躍表由 server.h/zskiplistNodeserver.h/zskiplist 兩個結構定義,前者為跳躍表節點,後者則儲存了跳躍節點的相關資訊,同之前的 集合 list 結構類似,其實只有 zskiplistNode 就可以實現了,但是引入後者是為了更加方便的操作:

/* ZSETs use a specialized version of Skiplists */
typedef struct zskiplistNode {
    // value
    sds ele;
    // 分值
    double score;
    // 後退指標
    struct zskiplistNode *backward;
    // 層
    struct zskiplistLevel {
        // 前進指標
        struct zskiplistNode *forward;
        // 跨度
        unsigned long span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    // 跳躍表頭指標
    struct zskiplistNode *header, *tail;
    // 表中節點的數量
    unsigned long length;
    // 表中層數最大的節點的層數
    int level;
} zskiplist;

正如文章開頭畫出來的那張標準的跳躍表那樣。

隨機層數

對於每一個新插入的節點,都需要呼叫一個隨機演算法給它分配一個合理的層數,原始碼在 t_zset.c/zslRandomLevel(void) 中被定義:

int zslRandomLevel(void) {
    int level = 1;
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

直觀上期望的目標是 50% 的概率被分配到 Level 1,25% 的概率被分配到 Level 2,12.5% 的概率被分配到 Level 3,以此類推...有 2-63 的概率被分配到最頂層,因為這裡每一層的晉升率都是 50%。

Redis 跳躍表預設允許最大的層數是 32,被原始碼中 ZSKIPLIST_MAXLEVEL 定義,當 Level[0] 有 264 個元素時,才能達到 32 層,所以定義 32 完全夠用了。

建立跳躍表

這個過程比較簡單,在原始碼中的 t_zset.c/zslCreate 中被定義:

zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;

    // 申請記憶體空間
    zsl = zmalloc(sizeof(*zsl));
    // 初始化層數為 1
    zsl->level = 1;
    // 初始化長度為 0
    zsl->length = 0;
    // 建立一個層數為 32,分數為 0,沒有 value 值的跳躍表頭節點
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    
    // 跳躍表頭節點初始化
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        // 將跳躍表頭節點的所有前進指標 forward 設定為 NULL
        zsl->header->level[j].forward = NULL;
        // 將跳躍表頭節點的所有跨度 span 設定為 0
        zsl->header->level[j].span = 0;
    }
    // 跳躍表頭節點的後退指標 backward 置為 NULL
    zsl->header->backward = NULL;
    // 表頭指向跳躍表尾節點的指標置為 NULL
    zsl->tail = NULL;
    return zsl;
}

即執行完之後建立瞭如下結構的初始化跳躍表:

插入節點實現

這幾乎是最重要的一段程式碼了,但總體思路也比較清晰簡單,如果理解了上面所說的跳躍表的原理,那麼很容易理清楚插入節點時發生的幾個動作 (幾乎跟連結串列類似)

  1. 找到當前我需要插入的位置 (其中包括相同 score 時的處理)
  2. 建立新節點,調整前後的指標指向,完成插入;

為了方便閱讀,我把原始碼 t_zset.c/zslInsert 定義的插入函式拆成了幾個部分

第一部分:宣告需要儲存的變數

// 儲存搜尋路徑
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
// 儲存經過的節點跨度
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

第二部分:搜尋當前節點插入位置

serverAssert(!isnan(score));
x = zsl->header;
// 逐步降級尋找目標節點,得到 "搜尋路徑"
for (i = zsl->level-1; i >= 0; i--) {
    /* store rank that is crossed to reach the insert position */
    rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
    // 如果 score 相等,還需要比較 value 值
    while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                sdscmp(x->level[i].forward->ele,ele) < 0)))
    {
        rank[i] += x->level[i].span;
        x = x->level[i].forward;
    }
    // 記錄 "搜尋路徑"
    update[i] = x;
}

討論: 有一種極端的情況,就是跳躍表中的所有 score 值都是一樣,zset 的查詢效能會不會退化為 O(n) 呢?

從上面的原始碼中我們可以發現 zset 的排序元素不只是看 score 值,也會比較 value 值 (字串比較)

第三部分:生成插入節點

/* we assume the element is not already inside, since we allow duplicated
 * scores, reinserting the same element should never happen since the
 * caller of zslInsert() should test in the hash table if the element is
 * already inside or not. */
level = zslRandomLevel();
// 如果隨機生成的 level 超過了當前最大 level 需要更新跳躍表的資訊
if (level > zsl->level) {
    for (i = zsl->level; i < level; i++) {
        rank[i] = 0;
        update[i] = zsl->header;
        update[i]->level[i].span = zsl->length;
    }
    zsl->level = level;
}
// 建立新節點
x = zslCreateNode(level,score,ele);

第四部分:重排前向指標

for (i = 0; i < level; i++) {
    x->level[i].forward = update[i]->level[i].forward;
    update[i]->level[i].forward = x;

    /* update span covered by update[i] as x is inserted here */
    x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
    update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
    update[i]->level[i].span++;
}

第五部分:重排後向指標並返回

x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
    x->level[0].forward->backward = x;
else
    zsl->tail = x;
zsl->length++;
return x;

節點刪除實現

刪除過程由原始碼中的 t_zset.c/zslDeleteNode 定義,和插入過程類似,都需要先把這個 "搜尋路徑" 找出來,然後對於每個層的相關節點重排一下前向後向指標,同時還要注意更新一下最高層數 maxLevel,直接放原始碼 (如果理解了插入這裡還是很容易理解的)

/* Internal function used by zslDelete, zslDeleteByScore and zslDeleteByRank */
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
    int i;
    for (i = 0; i < zsl->level; i++) {
        if (update[i]->level[i].forward == x) {
            update[i]->level[i].span += x->level[i].span - 1;
            update[i]->level[i].forward = x->level[i].forward;
        } else {
            update[i]->level[i].span -= 1;
        }
    }
    if (x->level[0].forward) {
        x->level[0].forward->backward = x->backward;
    } else {
        zsl->tail = x->backward;
    }
    while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)
        zsl->level--;
    zsl->length--;
}

/* Delete an element with matching score/element from the skiplist.
 * The function returns 1 if the node was found and deleted, otherwise
 * 0 is returned.
 *
 * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise
 * it is not freed (but just unlinked) and *node is set to the node pointer,
 * so that it is possible for the caller to reuse the node (including the
 * referenced SDS string at node->ele). */
int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* We may have multiple elements with the same score, what we need
     * is to find the element with both the right score and object. */
    x = x->level[0].forward;
    if (x && score == x->score && sdscmp(x->ele,ele) == 0) {
        zslDeleteNode(zsl, x, update);
        if (!node)
            zslFreeNode(x);
        else
            *node = x;
        return 1;
    }
    return 0; /* not found */
}

節點更新實現

當我們呼叫 ZADD 方法時,如果對應的 value 不存在,那就是插入過程,如果這個 value 已經存在,只是調整一下 score 的值,那就需要走一個更新流程。

假設這個新的 score 值並不會帶來排序上的變化,那麼就不需要調整位置,直接修改元素的 score 值就可以了,但是如果排序位置改變了,那就需要調整位置,該如何調整呢?

從原始碼 t_zset.c/zsetAdd 函式 1350 行左右可以看到,Redis 採用了一個非常簡單的策略:

/* Remove and re-insert when score changed. */
if (score != curscore) {
    zobj->ptr = zzlDelete(zobj->ptr,eptr);
    zobj->ptr = zzlInsert(zobj->ptr,ele,score);
    *flags |= ZADD_UPDATED;
}

把這個元素刪除再插入這個,需要經過兩次路徑搜尋,從這一點上來看,Redis 的 ZADD 程式碼似乎還有進一步優化的空間。

元素排名的實現

跳躍表本身是有序的,Redis 在 skiplist 的 forward 指標上進行了優化,給每一個 forward 指標都增加了 span 屬性,用來 表示從前一個節點沿著當前層的 forward 指標跳到當前這個節點中間會跳過多少個節點。在上面的原始碼中我們也可以看到 Redis 在插入、刪除操作時都會小心翼翼地更新 span 值的大小。

所以,沿著 "搜尋路徑",把所有經過節點的跨度 span 值進行累加就可以算出當前元素的最終 rank 值了:

/* Find the rank for an element by both score and key.
 * Returns 0 when the element cannot be found, rank otherwise.
 * Note that the rank is 1-based due to the span of zsl->header to the
 * first element. */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;

    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                sdscmp(x->level[i].forward->ele,ele) <= 0))) {
            // span 累加
            rank += x->level[i].span;
            x = x->level[i].forward;
        }

        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->ele && sdscmp(x->ele,ele) == 0) {
            return rank;
        }
    }
    return 0;
}

擴充套件閱讀

  1. 跳躍表 Skip List 的原理和實現(Java) - https://blog.csdn.net/DERRANTCM/article/details/79063312
  2. 【演算法導論33】跳躍表(Skip list)原理與java實現 - https://blog.csdn.net/brillianteagle/article/details/52206261

參考資料

  1. 《Redis 設計與實現》 - http://redisbook.com/
  2. 【官方文件】Redis 資料型別介紹 - http://www.redis.cn/topics/data-types-intro.html
  3. 《Redis 深度歷險》 - https://book.douban.com/subject/30386804/
  4. Redis 原始碼 - https://github.com/antirez/redis
  5. Redis 快速入門 - 易百教程 - https://www.yiibai.com/redis/redis_quick_guide.html
  6. Redis【入門】就這一篇! - https://www.wmyskxz.com/2018/05/31/redis-ru-men-jiu-zhe-yi-pian/
  7. Redis為什麼用跳錶而不用平衡樹? - https://mp.weixin.qq.com/s?__biz=MzA4NTg1MjM0Mg==&mid=2657261425&idx=1&sn=d840079ea35875a8c8e02d9b3e44cf95&scene=21#wechat_redirect
  8. 為啥 redis 使用跳錶(skiplist)而不是使用 red-black? - 知乎@於康 - https://www.zhihu.com/question/20202931
  • 本文已收錄至我的 Github 程式設計師成長系列 【More Than Java】,學習,不止 Code,歡迎 star:https://github.com/wmyskxz/MoreThanJava
  • 個人公眾號 :wmyskxz,個人獨立域名部落格:wmyskxz.com,堅持原創輸出,下方掃碼關注,2020,與您共同成長!

非常感謝各位人才能 看到這裡,如果覺得本篇文章寫得不錯,覺得 「我沒有三顆心臟」有點東西 的話,求點贊,求關注,求分享,求留言!

創作不易,各位的支援和認可,就是我創作的最大動力,我們下篇文章見