魔改redis之新增命令hrandmember
阿新 • • 發佈:2020-12-28
# 魔改redis之新增命令hrandmember
[toc]
## 正文
### 前言
想從`redis`的`hash`表獲取隨機的鍵值對,但是發現`redis`只支援`set`的隨機值[SRANDMEMBER](https://redis.io/commands/srandmember)。但是如果把`hash`表中的資料又存一份,佔用的空間又太大。也可以通過先`HLEN`獲取`hash`表的大小,隨機出一個偏移值,再呼叫`HSCAN`獲得一組資料。或者直接多次隨機,多次取值。但這樣效率始終不如`SRANDMEMBER`(`redis`的開銷主要是網路的開銷)。於是想到魔改`redis`程式碼,使其支援對`hash`表的隨機鍵值對。客戶端`jedis`打算使用`eval`來呼叫`redis`中新加入的指令。
### Set型別與srandmember命令
`Set`型別的編碼可以是`OBJ_ENCODING_HT`或者`OBJ_ENCODING_INTSET`,如果集合中的值全是數值,那麼`Set`的編碼(底層型別)為`OBJ_ENCODING_INTSET`, 如果加入了無法被解析為數值的字串,或者`set`的大小超過了`OBJ_SET_MAX_INTSET_ENTRIES`預設`512`,編碼則會變更為`OBJ_ENCODING_HT`。
`OBJ_ENCODING_INTSET`就是儲存著整數的有序陣列。加入新值時新`realloc`新增記憶體,再使用`memmove`將對應位置後的資料後移,然後在對應的位置加入值。
`OBJ_ENCODING_HT`編碼就是`dict`型別,也就是字典。
`srandmember`命令的主要處理函式是`srandmemberWithCountCommand`,如果傳入的`count`值是負數,意味著值可以重複。
1. 如果值可以重複,那麼每次隨機取出一個成員。
2. 如果`set`的`size`小於請求的數量,則返回`set`集合中全部的值。
```c
//case 1
if (!uniq) {
addReplyMultiBulkLen(c,count);
while(count--) {
encoding = setTypeRandomElement(set,&ele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
addReplyBulkLongLong(c,llele);
} else {
addReplyBulkCBuffer(c,ele,sdslen(ele));
}
}
return;
}
//case 2
if (count >= size) {
sunionDiffGenericCommand(c,c->argv+1,1,NULL,SET_OP_UNION);
return;
}
```
3. 集合的數量沒有遠遠大於請求的數量。將`set`的值複製到`dict`中,然後隨機刪除值,直到數量等於請求的值。
4. 集合數量遠大請求的數量。隨機取值,加入`dict`中,數量滿足後返回`dict`中的值。
```C
if (count*SRANDMEMBER_SUB_STRATEGY_MUL > size) {
setTypeIterator *si;
/* Add all the elements into the temporary dictionary. */
si = setTypeInitIterator(set);
while((encoding = setTypeNext(si,&ele,&llele)) != -1) {
int retval = DICT_ERR;
if (encoding == OBJ_ENCODING_INTSET) {
retval = dictAdd(d,createStringObjectFromLongLong(llele),NULL);
} else {
retval = dictAdd(d,createStringObject(ele,sdslen(ele)),NULL);
}
serverAssert(retval == DICT_OK);
}
setTypeReleaseIterator(si);
serverAssert(dictSize(d) == size);
/* Remove random elements to reach the right count. */
while(size > count) {
dictEntry *de;
de = dictGetRandomKey(d);
dictDelete(d,dictGetKey(de));
size--;
}
}
else {
unsigned long added = 0;
robj *objele;
while(added < count) {
encoding = setTypeRandomElement(set,&ele,&llele);
if (encoding == OBJ_ENCODING_INTSET) {
objele = createStringObjectFromLongLong(llele);
} else {
objele = createStringObject(ele,sdslen(ele));
}
/* Try to add the object to the dictionary. If it already exists
* free it, otherwise increment the number of objects we have
* in the result dictionary. */
if (dictAdd(d,objele,NULL) == DICT_OK)
added++;
else
decrRefCount(objele);
}
}
/* CASE 3 & 4: send the result to the user. */
{
dictIterator *di;
dictEntry *de;
addReplyMultiBulkLen(c,count);
di = dictGetIterator(d);
while((de = dictNext(di)) != NULL)
addReplyBulk(c,dictGetKey(de));
dictReleaseIterator(di);
dictRelease(d);
}
```
### Hash型別對比Set型別
Hash型別和Set型別的關係非常密切,在`java`原始碼中,往往`set`型別就是由`hash`型別實現的。在`redis`中在資料量較大的時候也十分相似。
前文提到 `Set`型別的編碼可以是`intset`或者是`dict`,`ziplist`的編碼是`ziplist`或者是`dict`。在當前的redis版本中,還並沒有新增`hrandmember`命令(6.2及之前)。
`ziplist`中的字串長度超過`OBJ_HASH_MAX_ZIPLIST_VALUE`(預設值為64),或者`entry`的個數超過`OBJ_HASH_MAX_ZIPLIST_ENTRIES`(預設值為512),則會轉化為`hashtable`編碼。
`ziplist`的`encoding`就是嘗試將字串值解析成`long`並儲存編碼。`hash`型別和 `set`型別最大的區別在於元素個數較少時,內部的編碼不同,`hash`內部的編碼是`ziplist`,而`set`的內部編碼是`intset`,(個人認為`hash`和`intset`內部編碼不統一是一處失誤,使用者對兩者有著相似的用法,也就是需求類似,然而底層實現卻不同,必然導致程式碼的重複,也確實如此,`redis`團隊似乎因為這個原因遲遲沒有新增`hrandmember`命令)
### hrandmember命令
因為`ziplist`不能被隨機訪問。對於`ziplist`編碼的`hash`表,我們採用以下演算法,來保證每個被取出來的`entry`的概率是一樣的。
**我們從長度為m的`ziplist`中取出n個entry,m>=n,設剩下的長度為m left ,剩餘要取的個數為nleft,每次取球時,我們取它的概率為 nleft/m left 。**
這樣能保證每個球被取出的概率相同,為`n/m`。可用數學歸納法證明。
通過使用這種方式,我們將時間複雜度從O(nm)降為O(m)。
處理`hash`編碼,我們複製`srandmember`的程式碼,並稍作修改,避免字串的複製以提高效率。
注意:當編碼為`ziplist`時,不支援負數的count。雖然也有返回值,但並不會重複,並且個數小於期望值。
```c
void hrandmemberWithCountCommand(client *c, long l) {
unsigned long entryCount, hashSize;
int uniq = 1;
hashTypeIterator *hi;
robj *hash;
dict *d;
double randomDouble;
double threshold;
unsigned long index = 0;
if ((hash = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp]))
== NULL || checkType(c,hash,OBJ_HASH)) return;
if(l >= 0) {
entryCount = (unsigned long) l;
} else {
entryCount = -l;
uniq = 0;
}
hashSize = hashTypeLength(hash);
if(entryCount > hashSize)
entryCount = hashSize;
addReplyMapLen(c, entryCount);
hi = hashTypeInitIterator(hash);
if(hash->encoding == OBJ_ENCODING_ZIPLIST) {
while (hashTypeNext(hi) != C_ERR && entryCount != 0) {
randomDouble = ((double)rand()) / RAND_MAX;
threshold = ((double)entryCount) / (hashSize - index);
if(randomDouble < threshold){
entryCount--;
addHashIteratorCursorToReply(c, hi, OBJ_HASH_KEY);
addHashIteratorCursorToReply(c, hi, OBJ_HASH_VALUE);
}
index ++;
}
} else {
// copy of srandmember
if(!uniq) {
while(entryCount--) {
sds key, value;
dictEntry *de = dictGetRandomKey(hash->ptr);
key = dictGetKey(de);
value = dictGetVal(de);
addReplyBulkCBuffer(c,key,sdslen(key));
addReplyBulkCBuffer(c,value,sdslen(value));
}
return;
}
if(entryCount >= hashSize) {
while (hashTypeNext(hi) != C_ERR) {
addHashIteratorCursorToReply(c, hi, OBJ_HASH_KEY);
addHashIteratorCursorToReply(c, hi, OBJ_HASH_VALUE);
}
return;
}
static dictType dt = {
dictSdsHash, /* hash function */
NULL, /* key dup */
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
NULL, /* key destructor */
NULL, /* val destructor */
NULL /* allow to expand */
};
d = dictCreate(&dt,NULL);
if(entryCount * HRANDMEMBER_SUB_STRATEGY_MUL > hashSize) {
/* Add all the elements into the temporary dictionary. */
while((hashTypeNext(hi)) != C_ERR) {
int ret = DICT_ERR;
sds key, value;
key = hashTypeCurrentFromHashTable(hi,OBJ_HASH_KEY);
value = hashTypeCurrentFromHashTable(hi,OBJ_HASH_VALUE);
ret = dictAdd(d, key, value);
serverAssert(ret == DICT_OK);
}
serverAssert(dictSize(d) == hashSize);
/* Remove random elements to reach the right count. */
while(hashSize > entryCount) {
dictEntry *de;
de = dictGetRandomKey(d);
dictDelete(d,dictGetKey(de));
hashSize--;
}
}
else {
unsigned long added = 0;
sds sdsKey, sdsVal;
while(added < entryCount) {
dictEntry *de = dictGetRandomKey(hash->ptr);
sdsKey = dictGetKey(de);
sdsVal = dictGetVal(de);
/* Try to add the object to the dictionary. If it already exists
* free it, otherwise increment the number of objects we have
* in the result dictionary. */
if (dictAdd(d,sdsKey,sdsVal) == DICT_OK){
added++;
}
}
}
{
dictIterator *di;
dictEntry *de;
di = dictGetIterator(d);
while((de = dictNext(di)) != NULL) {
sds key = dictGetKey(de);
sds value = dictGetVal(de);
addReplyBulkCBuffer(c,key,sdslen(key));
addReplyBulkCBuffer(c,value,sdslen(value));
}
dictReleaseIterator(di);
dictRelease(d);
}
}
hashTypeReleaseIterator(hi);
}
```
## 參考文獻
[srandmember](https://redis.io/commands/srandmember)
[redis原始碼](https://github.com/dewxi