1. 程式人生 > >Redis原始碼解析——字典遍歷

Redis原始碼解析——字典遍歷

        之前兩篇博文講解了字典庫的基礎,本文將講解其遍歷操作。之所以將遍歷操作獨立成一文來講,是因為其中的內容和之前的基本操作還是有區別的。特別是高階遍歷一節介紹的內容,充滿了精妙設計的演算法智慧。(轉載請指明出於breaksoftware的csdn部落格)

迭代器遍歷

        由於Redis字典庫有rehash機制,而且是漸進式的,所以迭代器操作可能會通過其他特殊方式來實現,以保證能遍歷到所有資料。但是閱讀完原始碼發現,其實這個迭代器是個受限的迭代器,實現方法也很簡單。我們先看下其基礎結構:

typedef struct dictIterator {
    dict *d;
    long index;
    int table, safe;
    dictEntry *entry, *nextEntry;
    /* unsafe iterator fingerprint for misuse detection. */
    long long fingerprint;
} dictIterator;

        成員變數d指向迭代器處理的字典。index是dictht中table陣列的下標。table是dict結構中dictht陣列的下標,即標識ht[0]還是ht[1]。safe欄位用於標識該迭代器是否為一個安全的迭代器。如果是,則可以在迭代過程中使用dictDelete、dictFind等方法;如果不是,則只能使用dictNext遍歷方法。entry和nextEntry分別指向當前的元素和下一個元素。fingerprint是字典的指紋,我們可以先看下指紋演算法的實現:

long long dictFingerprint(dict *d) {
    long long integers[6], hash = 0;
    int j;

    integers[0] = (long) d->ht[0].table;
    integers[1] = d->ht[0].size;
    integers[2] = d->ht[0].used;
    integers[3] = (long) d->ht[1].table;
    integers[4] = d->ht[1].size;
    integers[5] = d->ht[1].used;

    /* We hash N integers by summing every successive integer with the integer
     * hashing of the previous sum. Basically:
     *
     * Result = hash(hash(hash(int1)+int2)+int3) ...
     *
     * This way the same set of integers in a different order will (likely) hash
     * to a different number. */
    for (j = 0; j < 6; j++) {
        hash += integers[j];
        /* For the hashing step we use Tomas Wang's 64 bit integer hash. */
        hash = (~hash) + (hash << 21); // hash = (hash << 21) - hash - 1;
        hash = hash ^ (hash >> 24);
        hash = (hash + (hash << 3)) + (hash << 8); // hash * 265
        hash = hash ^ (hash >> 14);
        hash = (hash + (hash << 2)) + (hash << 4); // hash * 21
        hash = hash ^ (hash >> 28);
        hash = hash + (hash << 31);
    }
    return hash;
}

        可以見得,它使用了ht[0]和ht[1]的相關資訊進行Hash運算,從而得到該字典的指紋。我們可以發現,如果dictht的table、size和used任意一個有變化,則指紋將被改變。這也就意味著,擴容、鎖容、rehash、新增元素和刪除元素都會改變指紋(除了修改元素內容)。
        生成一個迭代器的方法很簡單,該字典庫提供了兩種方式:

dictIterator *dictGetIterator(dict *d)
{
    dictIterator *iter = zmalloc(sizeof(*iter));

    iter->d = d;
    iter->table = 0;
    iter->index = -1;
    iter->safe = 0;
    iter->entry = NULL;
    iter->nextEntry = NULL;
    return iter;
}

dictIterator *dictGetSafeIterator(dict *d) {
    dictIterator *i = dictGetIterator(d);

    i->safe = 1;
    return i;
}

        然後我們看下遍歷迭代器的操作。如果是初次迭代,則要檢視是否是安全迭代器,如果是安全迭代器則讓其對應的字典物件的iterators自增;如果不是則記錄當前字典的指紋

dictEntry *dictNext(dictIterator *iter)
{
    while (1) {
        if (iter->entry == NULL) {
            dictht *ht = &iter->d->ht[iter->table];
            if (iter->index == -1 && iter->table == 0) {
                if (iter->safe)
                    iter->d->iterators++;
                else
                    iter->fingerprint = dictFingerprint(iter->d);
            }

        因為要遍歷的時候,字典可以已經處於rehash的中間狀態,所以還要遍歷ht[1]中的元素

            iter->index++;
            if (iter->index >= (long) ht->size) {
                if (dictIsRehashing(iter->d) && iter->table == 0) {
                    iter->table++;
                    iter->index = 0;
                    ht = &iter->d->ht[1];
                } else {
                    break;
                }
            }
            iter->entry = ht->table[iter->index];
        } else {
            iter->entry = iter->nextEntry;
        }

        往往使用迭代器獲得元素後,會讓字典刪除這個元素,這個時候就無法通過迭代器獲取下一個元素了,於是作者設計了nextEntry來記錄當前物件的下一個物件指標

        if (iter->entry) {
            /* We need to save the 'next' here, the iterator user
             * may delete the entry we are returning. */
            iter->nextEntry = iter->entry->next;
            return iter->entry;
        }
    }
    return NULL;
}

        遍歷完成後,要呼叫下面方法釋放迭代器。需要注意的是,如果是安全迭代器,就需要讓其指向的字典的iterators自減以還原;如果不是,則需要檢測前後字典的指紋是否一致

void dictReleaseIterator(dictIterator *iter)
{
    if (!(iter->index == -1 && iter->table == 0)) {
        if (iter->safe)
            iter->d->iterators--;
        else
            assert(iter->fingerprint == dictFingerprint(iter->d));
    }
    zfree(iter);
}

        最後我們探討下什麼是安全迭代器。原始碼中我們看到如果safe為1,則讓字典iterators自增,這樣dict字典庫中的操作就不會觸發rehash漸進,從而在一定程度上(消除rehash影響,但是無法阻止使用者刪除元素)保證了字典結構的穩定。如果不是安全迭代器,則只能使用dictNext方法遍歷元素,而像獲取元素值的dictFetchValue方法都不能呼叫。因為dictFetchValue底層會呼叫_dictRehashStep讓字典結構發生改變。

static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}

        但是作者在原始碼說明中說安全迭代器在迭代過程中可以使用dictAdd方法,但是我覺得這個說法是錯誤的。因為dictAdd方法插入的元素可能在當前遍歷的物件之前,這樣就在之後的遍歷中無法遍歷到;也可能在當前遍歷的物件之後,這樣就在之後的遍歷中可以遍歷到。這樣一種動作,兩種可能結果的方式肯定是有問題的。我查了下該庫在Redis中的應用,遍歷操作不是為了獲取值就是為了刪除值,而沒有增加元素的操作,如

void clusterBlacklistCleanup(void) {
    dictIterator *di;
    dictEntry *de;

    di = dictGetSafeIterator(server.cluster->nodes_black_list);
    while((de = dictNext(di)) != NULL) {
        int64_t expire = dictGetUnsignedIntegerVal(de);

        if (expire < server.unixtime)
            dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
    }
    dictReleaseIterator(di);
}

高階遍歷

        高階遍歷允許ht[0]和ht[1]之間資料在遷移過程中進行遍歷,通過相應的演算法可以保證所有的元素都可以被遍歷到。我們先看下功能的實現:

unsigned long dictScan(dict *d,
                       unsigned long v,
                       dictScanFunction *fn,
                       void *privdata)

        引數d是字典的指標;v是迭代器,這個迭代器初始值為0,每次呼叫dictScan都會返回一個新的迭代器。於是下次呼叫這個函式時要傳入新的迭代器的值。fn是個函式指標,每遍歷到一個元素時,都是用該函式對元素進行操作。

typedef void (dictScanFunction)(void *privdata, const dictEntry *de);

        Redis中這個方法的呼叫樣例是:

        do {
            cursor = dictScan(ht, cursor, scanCallback, privdata);
        } while (cursor &&
              maxiterations-- &&
              listLength(keys) < (unsigned long)count);

        對於不在rehash狀態的字典,則只要對ht[0]中迭代器指向的連結串列進行遍歷就行了

    dictht *t0, *t1;
    const dictEntry *de;
    unsigned long m0, m1;

    if (dictSize(d) == 0) return 0;

    if (!dictIsRehashing(d)) {
        t0 = &(d->ht[0]);
        m0 = t0->sizemask;

        /* Emit entries at cursor */
        de = t0->table[v & m0];
        while (de) {
            fn(privdata, de);
            de = de->next;
        }

        如果在rehash狀態,就要遍歷ht[0]和ht[1]。遍歷前要確定哪個dictht.table長度短(假定其長度為len=8),先對短的中該迭代器(假定為iter=4)對應的鏈進行遍歷,然後遍歷大的。然而不僅要遍歷大的dictht中迭代器(iter=4)對應的鏈,還要遍歷比iter大len的迭代器(4+8=12)對應的連結串列。

    } else {
        t0 = &d->ht[0];
        t1 = &d->ht[1];

        /* Make sure t0 is the smaller and t1 is the bigger table */
        if (t0->size > t1->size) {
            t0 = &d->ht[1];
            t1 = &d->ht[0];
        }

        m0 = t0->sizemask;
        m1 = t1->sizemask;

        /* Emit entries at cursor */
        de = t0->table[v & m0];
        while (de) {
            fn(privdata, de);
            de = de->next;
        }

        /* Iterate over indices in larger table that are the expansion
         * of the index pointed to by the cursor in the smaller table */
        do {
            /* Emit entries at cursor */
            de = t1->table[v & m1];
            while (de) {
                fn(privdata, de);
                de = de->next;
            }

            /* Increment bits not covered by the smaller mask */
            v = (((v | m0) + 1) & ~m0) | (v & m0);

            /* Continue while bits covered by mask difference is non-zero */
        } while (v & (m0 ^ m1));
    }

        最後要重新計算下次使用的迭代器並返回

    /* Set unmasked bits so incrementing the reversed cursor
     * operates on the masked bits of the smaller table */
    v |= ~m0;

    /* Increment the reverse cursor */
    v = rev(v);
    v++;
    v = rev(v);

    return v;
}

        從上面的設計來看,呼叫dictScan時不能有多執行緒操作該字典,否則會出現遺漏遍歷的情況。但是在每次呼叫dictScan之間可以對字典進行操作。

        其實這個遍歷中最核心的是迭代器v的計算方法,我們只要讓v從0開始,執行“或操作”最短ht.table(~m0)大小、二進位制翻轉、加1、再二進位制翻轉就可以實現0到~m0的遍歷。我們看個例子:

        我一直想不出這套演算法為什麼能滿足這樣的特點,還是需要數學大神解釋一下。同時也可見這種演算法的作者Pieter Noordhuis數學有一定功底。

        關鍵這樣的演算法不僅可以完成遍歷,還可以在陣列大小動態變化時保證元素被全部遍歷到。我把程式碼提煉出來,模擬了長度為8的陣列向長度為16的陣列擴容,和長度為16的陣列向長度為8的陣列縮容的過程。為了讓問題簡單化,我們先不考慮兩個陣列的問題,只認為陣列在一瞬間被擴容和縮容。

        我們先看下擴容前的遍歷過程


        假如第8次迭代後,陣列瞬間擴容,這個時候遍歷過程是


        此時多了一次對下標為15的遍歷,可以想象這次遍歷應該會重複下標為15%8=7遍歷(即第8次)的元素。所以dictScan具有潛在對一個元素遍歷多次的問題。我們再看第7次迭代時發生瞬間擴容的情況


        此時陣列下標為11的遍歷(即第8次遍歷)會部分重複下標為3的遍歷(即第7次遍歷)元素。而之後的遍歷就不會重複了。

        我們再看下陣列的縮容。為縮容前的狀態是


        如果第16次遍歷時突然縮容,則遍歷過程是


        可見第16次遍歷的是新陣列下標為7的元素,和第15次遍歷老陣列下標為7的元素不同,本次遍歷的結果包含前者(因為它還包含之前下標為15的元素)。所以也存在元素重複遍歷的問題。

        我們看下第15次遍歷時突然縮容的遍歷過程

        因為縮容到8,所以最後一次遍歷下標7的情況,既包括之前老陣列下標為7的元素,也包含老陣列下標為15的元素。所以本次遍歷不會產生重複遍歷元素的問題。

        我們再看下第14次遍歷突然縮容的遍歷過程

        第14次本來是要遍歷下標為11的元素。由於發生縮容,就遍歷新的陣列的下標為3的元素。所以第14的遍歷包含第13次的遍歷元素。

        一個數組如此,像dict結構中有兩個dictht的情況,則稍微複雜點。我們通過下圖可以發現,不同時機ht[0]擴容或者縮容,都可以保證元素被全遍歷

        上面測試的程式碼是:

#define TWO_FOUR_MASK 15
#define TWO_THREE_MASK 7

static unsigned long rev(unsigned long v) {
    unsigned long s = 8 * sizeof(v);
    unsigned long mask = ~0;
    while ((s >>= 1) > 0) {
        mask ^= (mask <<s);
        v = ((v >> s) & mask) | ((v << s) & ~mask);
    }
    return v;
}

unsigned long loop_single_expand_shrinks(unsigned long v, int change, int expand) {
    unsigned long m0 = 0;

    if (expand) {
        if (change) {
            m0 = TWO_FOUR_MASK;
        }
        else {
            m0 = TWO_THREE_MASK;
        }
    }
    else {
        if (change) {
            m0 = TWO_THREE_MASK;
        }
        else {
            m0 = TWO_FOUR_MASK;
        }
    }

    unsigned long t0idx = t0idx = v & m0; 
    printf(" t0Index: %lu ", t0idx);

    v |= ~m0;
    v = rev(v);
    v++;
    v = rev(v);
    return v;
}

unsigned long loop(unsigned long v) {
    unsigned long m0 = TWO_THREE_MASK;
    unsigned long m1 = TWO_FOUR_MASK;

    unsigned long t0idx = v & m0;
    printf(" t0Index: %lu ", t0idx);

    printf(" t1Index: ");
    do {
        unsigned long t1idx = v & m1;
        printf("%lu ", t1idx);
        v = (((v | m0) + 1) & ~ m0) | (v & m0);
    } while (v & (m0 ^ m1));

    v |= ~m0;
    v = rev(v);
    v++;
    v = rev(v);
    return v;
}

unsigned long loop_expand_shrinks(unsigned long v, int change, int expand) {
    unsigned long m0 = 0;
    unsigned long m1 = 0;
    if (!change) {
        m0 = TWO_THREE_MASK;
        m1 = TWO_FOUR_MASK;

        unsigned long t0idx = v & m0;
        if (expand) {
            printf(" t0Index: %lu ", t0idx);
            printf(" t1Index: ");
        }
        else {
            printf(" t1Index: %lu ", t0idx);
            printf(" t0Index: ");
        }
        
        do {
            unsigned long t1idx = v & m1;
            printf("%lu ", t1idx);
            v = (((v | m0) + 1) & ~ m0) | (v & m0);
        } while (v & (m0 ^ m1));
    }
    else {
        if (expand) {
            m0 = TWO_FOUR_MASK;
        }
        else {
            m0 = TWO_THREE_MASK;
        }

        unsigned long t0idx = v & m0;
        printf(" t0Index: %lu ", t0idx);
    }

    v |= ~m0;
    v = rev(v);
    v++;
    v = rev(v);
    return v;
}

void print_binary(unsigned long v) {
    char s[128] = {0};
    _itoa_s(v, s, sizeof(s), 2);
    printf("0x%032s", s);
}

void check_loop_normal() {
    unsigned long v = 0;
    do 
    {
        print_binary(v);
        v = loop(v);
        printf("\n");
    } while (v != 0);
}

void check_loop_expand_shrinks(int expand) {
    int loop_count = 9;

    for (int n  = 0; n < loop_count; n++) {
        unsigned long v = 0;
        int change = 0;
        int call_count = 0;
        do 
        {
            if (call_count == n) {
                change = 1;
            }
            print_binary(v);
            v = loop_expand_shrinks(v, change, expand);
            call_count++;
            printf("\n");
        } while (v != 0);
        printf("\n");
    }
}

void check_loop_single_expand_shrinks(int expand) {
    int loop_count = 17;

    for (int n  = 0; n < loop_count; n++) {
        unsigned long v = 0;
        int change = 0;
        int call_count = 0;
        do 
        {
            if (call_count == n) {
                change = 1;
            }
            print_binary(v);
            v = loop_single_expand_shrinks(v, change, expand);
            call_count++;
            printf("\n");
        } while (v != 0);
        printf("\n");
    }
}