1. 程式人生 > >redis個人理解4--hash物件的儲存

redis個人理解4--hash物件的儲存

redis的hash的儲存

1.redis-obj基本介紹

眾所周知,redis支援5種基礎資料型別,分別是:

  • string
  • list
  • set
  • hset
  • hash

每種資料型別都存在至少一種encoding方式。redis把上面幾種基礎型別抽象成為一個結構體叫做 redisObject

typedef struct redisObject {
    unsigned type:4;   //type就是redis的基礎資料型別
    unsigned encoding:4;   //這個是具體資料型別的編碼方式
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;

複製程式碼

本文就重點介紹下hash型別在redis中是如何儲存和使用的。

2. redis hash型別

hash型別是一個可以儲存多個k-v鍵值對的結構,典型的樣子是這樣的:

其實具體的命令檢視redis的官方文件是最方便的,但是我還是把常用的總結下,也給自己加深下影響。

2.1 hash的典型命令

典型的命令格式:

hset redis-obj-name k1 v1 k2 v2 ...


hget redis_obj_name k1

注意這個命令是操作hash物件的,和hset物件沒有關係,不要搞混淆了。 例如:

redis> HSET myhash field1 "Hello"
(integer) 1 redis> HGET myhash field1 "Hello" redis> 複製程式碼

看上去很簡單,那麼這個myhash物件在redis的記憶體中是如何儲存的呢?直接上原始碼,大家看的比較清楚:

void hsetCommand(client *c) {
    int i, created = 0;
    robj *o;

    //首先引數必須是雙數,很好理解
    if ((c->argc % 2) == 1) {
        addReplyError(c,"wrong number of arguments for HMSET"
); return; } //函式名稱寫的很清楚,找不到就建立一個redis-obj物件 if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; hashTypeTryConversion(o,c->argv,2,c->argc-1);//這裡是兩點,它居然會嘗試去轉換下hashtype for (i = 2; i < c->argc; i += 2) created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY); /* HMSET (deprecated) and HSET return value is different. */ char *cmdname = c->argv[0]->ptr; if (cmdname[1] == 's' || cmdname[1] == 'S') { /* HSET */ addReplyLongLong(c, created); } else { /* HMSET */ addReply(c, shared.ok); } signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id); server.dirty++; } 複製程式碼

那我們看看hashTypeLookupWriteOrCreatehashTypeTryConversion到底幹了啥事。

robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
    robj *o = lookupKeyWrite(c->db,key);
    if (o == NULL) {
        o = createHashObject();  //這裡會去建立一個hash objecjt
        dbAdd(c->db,key,o);
    } else {
        if (o->type != OBJ_HASH) {
            addReply(c,shared.wrongtypeerr);
            return NULL;
        }
    }
    return o;
}

robj *createHashObject(void) {
    unsigned char *zl = ziplistNew();
    robj *o = createObject(OBJ_HASH, zl);
    o->encoding = OBJ_ENCODING_ZIPLIST;
    return o;
}

複製程式碼

看上面,createHashObject函式其實建立的redis-obj的type是hash型別,但是encoding卻是OBJ_ENCODING_ZIPLIST,看到這裡會有點疑惑,既然是hash型別應該用hash table結構來儲存,為什麼用壓縮連結串列結構呢?其實不用急,還有一個函式hashTypeTryConversion這個函式沒有看,現在再看看它的實現:

/* Check the length of a number of objects to see if we need to convert a
 * ziplist to a real hash. Note that we only check string encoded objects
 * as their string length can be queried in constant time. */

void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
    int i;

    if (o->encoding != OBJ_ENCODING_ZIPLIST) return;

    for (i = start; i <= end; i++) {
        if (sdsEncodedObject(argv[i]) &&
            sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
        {
            hashTypeConvert(o, OBJ_ENCODING_HT);
            break;
        }
    }
}

複製程式碼

其實上面的註釋寫的很清楚,如果是ZIPLIST的編碼方式,遍歷下ziplist,如果當前的長度已經大於server.hash_max_ziplist_value,就把encoding方式改為OBJ_ENCODING_HT。還有一種情況是當

hash-max-ziplist-entries 512  
hash-max-ziplist-value 64
複製程式碼

看到這裡貌似有點明白了,原來redis對於小數字,短字串,為了能比較高效的利用記憶體,都儲存到ziplist中,而不是直接放到hash-table結構中,當數字或者字串超出一定的閾值時候,才會改用hash表的儲存方式,這樣達到節約記憶體的作用啊。在這裡不得不感嘆下redis的作者真不怕麻煩,為了能節約一點記憶體,可以說費勁了心思。
總結下,redis對於hash物件提供了兩種儲存方式,也就是redisObject.encoding變數的取值是有兩個的,分別如下:

  • OBJ_ENCODING_ZIPLIST
  • OBJ_ENCODING_HT

這兩種編碼方式內部的資料結構是什麼樣子的呢? 首先我們先看看OBJ_ENCODING_ZIPLIST 型別的儲存方式

2.2 OBJ_ENCODING_ZIPLIST儲存方式

createHashObject函式中,呼叫了ziplist的建立函式ziplistNew,我們來看下這個函式的實現:

/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {
    unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
    unsigned char *zl = zmalloc(bytes);
    ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
    ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
    ZIPLIST_LENGTH(zl) = 0;
    zl[bytes-1] = ZIP_END;
    return zl;
}

複製程式碼

程式碼裡面用了一堆巨集,看上去不太直觀,畫個圖看下,就很清晰了:

再附上ziplist的header的註釋:

/* The size of a ziplist header: two 32 bit integers for the total
 * bytes count and last item offset. One 16 bit integer for the number
 * of items field. */
複製程式碼

結合程式碼很輕鬆就應該能看懂了。

再看上面的程式碼hsetCommand中,呼叫了hashTypeSet函式進行插入資料 我們再看看對於OBJ_ENCODING_ZIPLIST的編碼方式,如何插入資料。

int hashTypeSet(robj *o, sds field, sds value, int flags) {
    int update = 0;

    if (o->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl, *fptr, *vptr;

        zl = o->ptr;
        fptr = ziplistIndex(zl, ZIPLIST_HEAD);
        if (fptr != NULL) {
            fptr = ziplistFind(fptr, (unsigned char*)field, sdslen(field), 1);
            if (fptr != NULL) {
                /* Grab pointer to the value (fptr points to the field) */
                vptr = ziplistNext(zl, fptr);
                serverAssert(vptr != NULL);
                update = 1;

                /* Delete value */
                zl = ziplistDelete(zl, &vptr);

                /* Insert new value */
                zl = ziplistInsert(zl, vptr, (unsigned char*)value,
                        sdslen(value));
            }
        }
        o->ptr = zl;

        /* Check if the ziplist needs to be converted to a hash table */
        if (hashTypeLength(o) > server.hash_max_ziplist_entries)
            hashTypeConvert(o, OBJ_ENCODING_HT);
        
    ...
}

複製程式碼

首次插入的時候,ziplistIndex(zl, ZIPLIST_HEAD);函式會返回NULL

unsigned char *ziplistIndex(unsigned char *zl, int index) {
    unsigned char *p;
    unsigned int prevlensize, prevlen = 0;
    if (index < 0) {
        index = (-index)-1;
        p = ZIPLIST_ENTRY_TAIL(zl);
        if (p[0] != ZIP_END) {
            ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
            while (prevlen > 0 && index--) {
                p -= prevlen;
                ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
            }
        }
    } else {
        p = ZIPLIST_ENTRY_HEAD(zl);
        while (p[0] != ZIP_END && index--) {
            p += zipRawEntryLength(p);
        }
    }
    return (p[0] == ZIP_END || index > 0) ? NULL : p;
}

複製程式碼

進而直接呼叫ziplistPush把field和value都插入到ziplist中。再插入過後,還再多了一次判斷當前的ziplist的長度是不是大於了server.hash_max_ziplist_entries,如果是,就需要轉換為hashtable結構儲存。

unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
    unsigned char *p;
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
    return __ziplistInsert(zl,p,s,slen);
}

unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */
    zlentry tail;

    /* Find out prevlen for the entry that is inserted. */
    if (p[0] != ZIP_END) {
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            prevlen = zipRawEntryLength(ptail);
        }
    }

    /* See if the entry can be encoded */
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipStoreEntryEncoding will use the
         * string length to figure out how to encode it. */
        reqlen = slen;
    }
    /* We need space for both the length of the previous entry and
     * the length of the payload. */
    reqlen += zipStorePrevEntryLength(NULL,prevlen);
    reqlen += zipStoreEntryEncoding(NULL,encoding,slen);

    /* When the insert position is not equal to the tail, we need to
     * make sure that the next entry can hold this entry's length in
     * its prevlen field. */
    int forcelarge = 0;
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
    if (nextdiff == -4 && reqlen < 4) {
        nextdiff = 0;
        forcelarge = 1;
    }

    /* Store offset because a realloc may change the address of zl. */
    offset = p-zl;
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

    /* Apply memory move when necessary and update tail offset. */
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

        /* Encode this entry's raw length in the next entry. */
        if (forcelarge)
            zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
        else
            zipStorePrevEntryLength(p+reqlen,reqlen);

        /* Update offset for tail */
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        /* When the tail contains more than one entry, we need to take
         * "nextdiff" in account as well. Otherwise, a change in the
         * size of prevlen doesn't have an effect on the *tail* offset. */
        zipEntry(p+reqlen, &tail);
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        /* This element will be the new tail. */
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    /* When nextdiff != 0, the raw length of the next entry has changed, so
     * we need to cascade the update throughout the ziplist */
    if (nextdiff != 0) {
        offset = p-zl;
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    /* Write the entry */
    p += zipStorePrevEntryLength(p,prevlen);  
    p += zipStoreEntryEncoding(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}

複製程式碼

插入的時候可以看出來,redis對於ziplist的儲存資料結構也是比較特殊的。一個item項的結構如下:

  p += zipStorePrevEntryLength(p,prevlen);    //計算上一個item項的長度
  p += zipStoreEntryEncoding(p,encoding,slen); //計算當前自己需要的編碼
複製程式碼

其中prev_entry_length儲存的是上一個item項的長度,這個也是redis比較特殊的地方,在本次更新item的時候採取計算上一個item項的長度。

encoding是當前這一項的編碼方式。ziplist既然是壓縮連結串列,本質上只是是對數字型別的壓縮,字串數字都統一轉換為int8, int16, int32, int64 來儲存,這樣比較節約記憶體。

具體的程式碼實現如下:

  /* See if the entry can be encoded */
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* 'encoding' is set to the appropriate integer encoding */
        reqlen = zipIntSize(encoding);
    } else {
        /* 'encoding' is untouched, however zipStoreEntryEncoding will use the
         * string length to figure out how to encode it. */
        reqlen = slen;
    }

複製程式碼

具體的zipTryEncoding 程式碼實現:

/* Check if string pointed to by 'entry' can be encoded as an integer.
 * Stores the integer value in 'v' and its encoding in 'encoding'. */
int zipTryEncoding(unsigned char *entry, unsigned int entrylen, long long *v, unsigned char *encoding) {
    long long value;

    if (entrylen >= 32 || entrylen == 0) return 0;
    if (string2ll((char*)entry,entrylen,&value)) {
        /* Great, the string can be encoded. Check what's the smallest
         * of our encoding types that can hold this value. */
        if (value >= 0 && value <= 12) {
            *encoding = ZIP_INT_IMM_MIN+value;
        } else if (value >= INT8_MIN && value <= INT8_MAX) {
            *encoding = ZIP_INT_8B;
        } else if (value >= INT16_MIN && value <= INT16_MAX) {
            *encoding = ZIP_INT_16B;
        } else if (value >= INT24_MIN && value <= INT24_MAX) {
            *encoding = ZIP_INT_24B;
        } else if (value >= INT32_MIN && value <= INT32_MAX) {
            *encoding = ZIP_INT_32B;
        } else {
            *encoding = ZIP_INT_64B;
        }
        *v = value;
        return 1;
    }
    return 0;
}


複製程式碼

其中string2ll其實就是一個atoi,但是要實現一個沒bug的atoi還是很難的,看看redis的實現,覺得考慮的好全面,負數,越界都考慮清楚,感覺還是很難的。

/* Convert a string into a long long. Returns 1 if the string could be parsed
 * into a (non-overflowing) long long, 0 otherwise. The value will be set to
 * the parsed value when appropriate.
 *
 * Note that this function demands that the string strictly represents
 * a long long: no spaces or other characters before or after the string
 * representing the number are accepted, nor zeroes at the start if not
 * for the string "0" representing the zero number.
 *
 * Because of its strictness, it is safe to use this function to check if
 * you can convert a string into a long long, and obtain back the string
 * from the number without any loss in the string representation. */
int string2ll(const char *s, size_t slen, long long *value) {
    const char *p = s;
    size_t plen = 0;
    int negative = 0;
    unsigned long long v;

    /* A zero length string is not a valid number. */
    if (plen == slen)
        return 0;

    /* Special case: first and only digit is 0. */
    if (slen == 1 && p[0] == '0') {
        if (value != NULL) *value = 0;
        return 1;
    }

    /* Handle negative numbers: just set a flag and continue like if it
     * was a positive number. Later convert into negative. */
    if (p[0] == '-') {
        negative = 1;
        p++; plen++;

        /* Abort on only a negative sign. */
        if (plen == slen)
            return 0;
    }

    /* First digit should be 1-9, otherwise the string should just be 0. */
    if (p[0] >= '1' && p[0] <= '9') {
        v = p[0]-'0';
        p++; plen++;
    } else {
        return 0;
    }

    /* Parse all the other digits, checking for overflow at every step. */
    while (plen < slen && p[0] >= '0' && p[0] <= '9') {
        if (v > (ULLONG_MAX / 10)) /* Overflow. */
            return 0;
        v *= 10;

        if (v > (ULLONG_MAX - (p[0]-'0'))) /* Overflow. */
            return 0;
        v += p[0]-'0';

        p++; plen++;
    }

    /* Return if not all bytes were used. */
    if (plen < slen)
        return 0;

    /* Convert to negative if needed, and do the final overflow check when
     * converting from unsigned long long to long long. */
    if (negative) {
        if (v > ((unsigned long long)(-(LLONG_MIN+1))+1)) /* Overflow. */
            return 0;
        if (value != NULL) *value = -v;
    } else {
        if (v > LLONG_MAX) /* Overflow. */
            return 0;
        if (value != NULL) *value = v;
    }
    return 1;
}

複製程式碼

其實每次更新,都會觸發記憶體的realloc,這個地方我感覺其實還是不太好的,如果一次更新n個kv對,就需要呼叫realloc函式n次,感覺有點浪費啊。

2.2 OBJ_ENCODING_HT儲存方式

從上面的程式碼可以看出來有兩種場景會觸發hash obj修改encoding方式,分別如下:

hash-max-ziplist-entries 512
hash-max-ziplist-value 64
複製程式碼

當ziplist的entry個數小於512的時候, 還有一種場景是entry的值長度小於64的時候。當然這其實是redis的一個配置項。

那麼hash table儲存又是什麼樣的結構呢?看下面的程式碼:

void hashTypeConvertZiplist(robj *o, int enc) {
    serverAssert(o->encoding == OBJ_ENCODING_ZIPLIST);

    if (enc == OBJ_ENCODING_ZIPLIST) {
        /* Nothing to do... */

    } else if (enc == OBJ_ENCODING_HT) {
        hashTypeIterator *hi;
        dict *dict;
        int ret;

        hi = hashTypeInitIterator(o);
        dict = dictCreate(&hashDictType, NULL);

        while (hashTypeNext(hi) != C_ERR) {
            sds key, value;

            key = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY);
            value = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE);
            ret = dictAdd(dict, key, value);
            if (ret != DICT_OK) {
                serverLogHexDump(LL_WARNING,"ziplist with dup elements dump",
                    o->ptr,ziplistBlobLen(o->ptr));
                serverPanic("Ziplist corruption detected");
            }
        }
        hashTypeReleaseIterator(hi);
        zfree(o->ptr);
        o->encoding = OBJ_ENCODING_HT;
        o->ptr = dict;
    } else {
        serverPanic("Unknown hash encoding");
    }
}


複製程式碼

可以看出會建立一個迭代器,遍歷當前的ziplist結構,然後放到新建立的dict結構中。

關於dict的結構,可以參看之前我的一篇dict的資料結構分析

3. 總結

hash物件的儲存如果使用的編碼是ZipList的時候,感覺效率是不高的,平均複雜度是O(n),如果涉及到記憶體的連鎖移動的話,最差的事件複雜度其實是o(n^2)