1. 程式人生 > >魔改redis之新增命令hrandmember

魔改redis之新增命令hrandmember

# 魔改redis之新增命令hrandmember [toc] ## 正文 ### 前言 想從`redis`的`hash`表獲取隨機的鍵值對,但是發現`redis`只支援`set`的隨機值[SRANDMEMBER](https://redis.io/commands/srandmember)。但是如果把`hash`表中的資料又存一份,佔用的空間又太大。也可以通過先`HLEN`獲取`hash`表的大小,隨機出一個偏移值,再呼叫`HSCAN`獲得一組資料。或者直接多次隨機,多次取值。但這樣效率始終不如`SRANDMEMBER`(`redis`的開銷主要是網路的開銷)。於是想到魔改`redis`程式碼,使其支援對`hash`表的隨機鍵值對。客戶端`jedis`打算使用`eval`來呼叫`redis`中新加入的指令。 ### Set型別與srandmember命令 `Set`型別的編碼可以是`OBJ_ENCODING_HT`或者`OBJ_ENCODING_INTSET`,如果集合中的值全是數值,那麼`Set`的編碼(底層型別)為`OBJ_ENCODING_INTSET`, 如果加入了無法被解析為數值的字串,或者`set`的大小超過了`OBJ_SET_MAX_INTSET_ENTRIES`預設`512`,編碼則會變更為`OBJ_ENCODING_HT`。 `OBJ_ENCODING_INTSET`就是儲存著整數的有序陣列。加入新值時新`realloc`新增記憶體,再使用`memmove`將對應位置後的資料後移,然後在對應的位置加入值。 `OBJ_ENCODING_HT`編碼就是`dict`型別,也就是字典。 `srandmember`命令的主要處理函式是`srandmemberWithCountCommand`,如果傳入的`count`值是負數,意味著值可以重複。 1. 如果值可以重複,那麼每次隨機取出一個成員。 2. 如果`set`的`size`小於請求的數量,則返回`set`集合中全部的值。 ```c //case 1 if (!uniq) { addReplyMultiBulkLen(c,count); while(count--) { encoding = setTypeRandomElement(set,&ele,&llele); if (encoding == OBJ_ENCODING_INTSET) { addReplyBulkLongLong(c,llele); } else { addReplyBulkCBuffer(c,ele,sdslen(ele)); } } return; } //case 2 if (count >= size) { sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION); return; } ``` 3. 集合的數量沒有遠遠大於請求的數量。將`set`的值複製到`dict`中,然後隨機刪除值,直到數量等於請求的值。 4. 集合數量遠大請求的數量。隨機取值,加入`dict`中,數量滿足後返回`dict`中的值。 ```C if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) { setTypeIterator *si; /* Add all the elements into the temporary dictionary. */ si = setTypeInitIterator(set); while((encoding = setTypeNext(si,&ele,&llele)) != -1) { int retval = DICT_ERR; if (encoding == OBJ_ENCODING_INTSET) { retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL); } else { retval = dictAdd(d,createStringObject(ele,sdslen(ele)),NULL); } serverAssert(retval == DICT_OK); } setTypeReleaseIterator(si); serverAssert(dictSize(d) == size); /* Remove random elements to reach the right count. */ while(size > count) { dictEntry *de; de = dictGetRandomKey(d); dictDelete(d,dictGetKey(de)); size--; } } else { unsigned long added = 0; robj *objele; while(added < count) { encoding = setTypeRandomElement(set,&ele,&llele); if (encoding == OBJ_ENCODING_INTSET) { objele = createStringObjectFromLongLong(llele); } else { objele = createStringObject(ele,sdslen(ele)); } /* Try to add the object to the dictionary. If it already exists * free it, otherwise increment the number of objects we have * in the result dictionary. */ if (dictAdd(d,objele,NULL) == DICT_OK) added++; else decrRefCount(objele); } } /* CASE 3 & 4: send the result to the user. */ { dictIterator *di; dictEntry *de; addReplyMultiBulkLen(c,count); di = dictGetIterator(d); while((de = dictNext(di)) != NULL) addReplyBulk(c,dictGetKey(de)); dictReleaseIterator(di); dictRelease(d); } ``` ### Hash型別對比Set型別 Hash型別和Set型別的關係非常密切,在`java`原始碼中,往往`set`型別就是由`hash`型別實現的。在`redis`中在資料量較大的時候也十分相似。 前文提到 `Set`型別的編碼可以是`intset`或者是`dict`,`ziplist`的編碼是`ziplist`或者是`dict`。在當前的redis版本中,還並沒有新增`hrandmember`命令(6.2及之前)。 `ziplist`中的字串長度超過`OBJ_HASH_MAX_ZIPLIST_VALUE`(預設值為64),或者`entry`的個數超過`OBJ_HASH_MAX_ZIPLIST_ENTRIES`(預設值為512),則會轉化為`hashtable`編碼。 `ziplist`的`encoding`就是嘗試將字串值解析成`long`並儲存編碼。`hash`型別和 `set`型別最大的區別在於元素個數較少時,內部的編碼不同,`hash`內部的編碼是`ziplist`,而`set`的內部編碼是`intset`,(個人認為`hash`和`intset`內部編碼不統一是一處失誤,使用者對兩者有著相似的用法,也就是需求類似,然而底層實現卻不同,必然導致程式碼的重複,也確實如此,`redis`團隊似乎因為這個原因遲遲沒有新增`hrandmember`命令) ### hrandmember命令 因為`ziplist`不能被隨機訪問。對於`ziplist`編碼的`hash`表,我們採用以下演算法,來保證每個被取出來的`entry`的概率是一樣的。 **我們從長度為m的`ziplist`中取出n個entry,m>=n,設剩下的長度為m left ,剩餘要取的個數為nleft,每次取球時,我們取它的概率為 nleft/m left 。** 這樣能保證每個球被取出的概率相同,為`n/m`。可用數學歸納法證明。 通過使用這種方式,我們將時間複雜度從O(nm)降為O(m)。 處理`hash`編碼,我們複製`srandmember`的程式碼,並稍作修改,避免字串的複製以提高效率。 注意:當編碼為`ziplist`時,不支援負數的count。雖然也有返回值,但並不會重複,並且個數小於期望值。 ```c void hrandmemberWithCountCommand(client *c, long l) { unsigned long entryCount, hashSize; int uniq = 1; hashTypeIterator *hi; robj *hash; dict *d; double randomDouble; double threshold; unsigned long index = 0; if ((hash = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL || checkType(c,hash,OBJ_HASH)) return; if(l >= 0) { entryCount = (unsigned long) l; } else { entryCount = -l; uniq = 0; } hashSize = hashTypeLength(hash); if(entryCount > hashSize) entryCount = hashSize; addReplyMapLen(c, entryCount); hi = hashTypeInitIterator(hash); if(hash->encoding == OBJ_ENCODING_ZIPLIST) { while (hashTypeNext(hi) != C_ERR && entryCount != 0) { randomDouble = ((double)rand()) / RAND_MAX; threshold = ((double)entryCount) / (hashSize - index); if(randomDouble < threshold){ entryCount--; addHashIteratorCursorToReply(c, hi, OBJ_HASH_KEY); addHashIteratorCursorToReply(c, hi, OBJ_HASH_VALUE); } index ++; } } else { // copy of srandmember if(!uniq) { while(entryCount--) { sds key, value; dictEntry *de = dictGetRandomKey(hash->ptr); key = dictGetKey(de); value = dictGetVal(de); addReplyBulkCBuffer(c,key,sdslen(key)); addReplyBulkCBuffer(c,value,sdslen(value)); } return; } if(entryCount >= hashSize) { while (hashTypeNext(hi) != C_ERR) { addHashIteratorCursorToReply(c, hi, OBJ_HASH_KEY); addHashIteratorCursorToReply(c, hi, OBJ_HASH_VALUE); } return; } static dictType dt = { dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ NULL, /* key destructor */ NULL, /* val destructor */ NULL /* allow to expand */ }; d = dictCreate(&dt,NULL); if(entryCount * HRANDMEMBER_SUB_STRATEGY_MUL > hashSize) { /* Add all the elements into the temporary dictionary. */ while((hashTypeNext(hi)) != C_ERR) { int ret = DICT_ERR; sds key, value; key = hashTypeCurrentFromHashTable(hi,OBJ_HASH_KEY); value = hashTypeCurrentFromHashTable(hi,OBJ_HASH_VALUE); ret = dictAdd(d, key, value); serverAssert(ret == DICT_OK); } serverAssert(dictSize(d) == hashSize); /* Remove random elements to reach the right count. */ while(hashSize > entryCount) { dictEntry *de; de = dictGetRandomKey(d); dictDelete(d,dictGetKey(de)); hashSize--; } } else { unsigned long added = 0; sds sdsKey, sdsVal; while(added < entryCount) { dictEntry *de = dictGetRandomKey(hash->ptr); sdsKey = dictGetKey(de); sdsVal = dictGetVal(de); /* Try to add the object to the dictionary. If it already exists * free it, otherwise increment the number of objects we have * in the result dictionary. */ if (dictAdd(d,sdsKey,sdsVal) == DICT_OK){ added++; } } } { dictIterator *di; dictEntry *de; di = dictGetIterator(d); while((de = dictNext(di)) != NULL) { sds key = dictGetKey(de); sds value = dictGetVal(de); addReplyBulkCBuffer(c,key,sdslen(key)); addReplyBulkCBuffer(c,value,sdslen(value)); } dictReleaseIterator(di); dictRelease(d); } } hashTypeReleaseIterator(hi); } ``` ## 參考文獻 [srandmember](https://redis.io/commands/srandmember) [redis原始碼](https://github.com/dewxi