Redis之列表物件原始碼閱讀
阿新 • • 發佈:2018-11-24
listTypeTryConversion:嘗試將列表物件轉換成linkedlist編碼
void listTypeTryConversion(robj *subject, robj *value) { // 確保 subject 為 ZIPLIST 編碼 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return; if (sdsEncodedObject(value) && // 看字串是否過長 sdslen(value->ptr) > server.list_max_ziplist_value) // 將編碼轉換為雙端連結串列 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); }
listTypePush:將value新增到列表物件的頭部或尾部
void listTypePush(robj *subject, robj *value, int where) { /* Check if we need to convert the ziplist */ // 是否需要轉換編碼? listTypeTryConversion(subject,value); if (subject->encoding == REDIS_ENCODING_ZIPLIST && ziplistLen(subject->ptr) >= server.list_max_ziplist_entries) listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST); // ZIPLIST if (subject->encoding == REDIS_ENCODING_ZIPLIST) { int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL; // 取出物件的值,因為 ZIPLIST 只能儲存字串或整數 value = getDecodedObject(value); subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos); decrRefCount(value); // 雙端連結串列 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) { if (where == REDIS_HEAD) { listAddNodeHead(subject->ptr,value); } else { listAddNodeTail(subject->ptr,value); } incrRefCount(value); // 未知編碼 } else { redisPanic("Unknown list encoding"); } }
listTypePop:彈出一個物件
robj *listTypePop(robj *subject, int where) { robj *value = NULL; // ZIPLIST if (subject->encoding == REDIS_ENCODING_ZIPLIST) { unsigned char *p; unsigned char *vstr; unsigned int vlen; long long vlong; // 決定彈出元素的位置 int pos = (where == REDIS_HEAD) ? 0 : -1; p = ziplistIndex(subject->ptr,pos); if (ziplistGet(p,&vstr,&vlen,&vlong)) { // 為被彈出元素建立物件 if (vstr) { value = createStringObject((char*)vstr,vlen); } else { value = createStringObjectFromLongLong(vlong); } /* We only need to delete an element when it exists */ // 從 ziplist 中刪除被彈出元素 subject->ptr = ziplistDelete(subject->ptr,&p); } // 雙端連結串列 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) { list *list = subject->ptr; listNode *ln; if (where == REDIS_HEAD) { ln = listFirst(list); } else { ln = listLast(list); } // 刪除被彈出節點 if (ln != NULL) { value = listNodeValue(ln); incrRefCount(value); listDelNode(list,ln); } // 未知編碼 } else { redisPanic("Unknown list encoding"); } // 返回節點物件 return value; }
listTypeLength:返回節點的數量
unsigned long listTypeLength(robj *subject) {
// ZIPLIST
if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
return ziplistLen(subject->ptr);
// 雙端連結串列
} else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
return listLength((list*)subject->ptr);
// 未知編碼
} else {
redisPanic("Unknown list encoding");
}
}
pushGenericCommand:根據where將value新增到頭部或尾部
void pushGenericCommand(redisClient *c, int where) {
int j, waiting = 0, pushed = 0;
// 取出列表物件
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
// 如果列表物件不存在,那麼可能有客戶端在等待這個鍵的出現
int may_have_waiting_clients = (lobj == NULL);
if (lobj && lobj->type != REDIS_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
// 將列表狀態設定為就緒
if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
// 遍歷所有輸入值,並將它們新增到列表中
for (j = 2; j < c->argc; j++) {
// 編碼值
c->argv[j] = tryObjectEncoding(c->argv[j]);
// 如果列表物件不存在,那麼建立一個,並關聯到資料庫
if (!lobj) {
lobj = createZiplistObject();
dbAdd(c->db,c->argv[1],lobj);
}
// 將值推入到列表
listTypePush(lobj,c->argv[j],where);
pushed++;
}
// 返回新增的節點數量
addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
// 如果至少有一個元素被成功推入,那麼執行以下程式碼
if (pushed) {
char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";
// 傳送鍵修改訊號
signalModifiedKey(c->db,c->argv[1]);
// 傳送事件通知
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}
void lpushCommand(redisClient *c) {
pushGenericCommand(c,REDIS_HEAD);
}
void rpushCommand(redisClient *c) {
pushGenericCommand(c,REDIS_TAIL);
}
pushxGenericCommand:將值插入到某個值的前面或後面
void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
robj *subject;
listTypeIterator *iter;
listTypeEntry entry;
int inserted = 0;
// 取出列表物件
if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,subject,REDIS_LIST)) return;
// 執行的是 LINSERT 命令
if (refval != NULL) {
// 看儲存值 value 是否需要將列表編碼轉換為雙端連結串列
listTypeTryConversion(subject,val);
/* Seek refval from head to tail */
// 在列表中查詢 refval 物件
iter = listTypeInitIterator(subject,0,REDIS_TAIL);
while (listTypeNext(iter,&entry)) {
if (listTypeEqual(&entry,refval)) {
// 找到了,將值插入到節點的前面或後面
listTypeInsert(&entry,val,where);
inserted = 1;
break;
}
}
listTypeReleaseIterator(iter);
if (inserted) {
/* Check if the length exceeds the ziplist length threshold. */
// 檢視插入之後是否需要將編碼轉換為雙端連結串列
if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"linsert",
c->argv[1],c->db->id);
server.dirty++;
} else {
/* Notify client of a failed insert */
// refval 不存在,插入失敗
addReply(c,shared.cnegone);
return;
}
// 執行的是 LPUSHX 或 RPUSHX 命令
} else {
char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";
listTypePush(subject,val,where);
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
server.dirty++;
}
addReplyLongLong(c,listTypeLength(subject));
}
blpopCommand:阻塞彈出命令
void blpopCommand(redisClient *c) {
blockingPopGenericCommand(c,REDIS_HEAD);
}
blockingPopGenericCommand:遍歷所有輸入的列表鍵,只要有一個列表鍵可以pop,就返回,否則如果都不能pop,就阻塞
void blockingPopGenericCommand(redisClient *c, int where) {
robj *o;
mstime_t timeout;
int j;
// 取出 timeout 引數
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= REDIS_OK) return;
// 遍歷所有列表鍵
for (j = 1; j < c->argc-1; j++) {
// 取出列表鍵
o = lookupKeyWrite(c->db,c->argv[j]);
// 有非空列表?
if (o != NULL) {
if (o->type != REDIS_LIST) {
addReply(c,shared.wrongtypeerr);
return;
} else {
// 非空列表
if (listTypeLength(o) != 0) {
/* Non empty list, this is like a non normal [LR]POP. */
char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
// 彈出值
robj *value = listTypePop(o,where);
redisAssert(value != NULL);
// 回覆客戶端
addReplyMultiBulkLen(c,2);
// 回覆彈出元素的列表
addReplyBulk(c,c->argv[j]);
// 回覆彈出值
addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
c->argv[j],c->db->id);
// 刪除空列表
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[j]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
}
signalModifiedKey(c->db,c->argv[j]);
server.dirty++;
/* Replicate it as an [LR]POP instead of B[LR]POP. */
// 傳播一個 [LR]POP 而不是 B[LR]POP
rewriteClientCommandVector(c,2,
(where == REDIS_HEAD) ? shared.lpop : shared.rpop,
c->argv[j]);
return;
}
}
}
}
/* If we are inside a MULTI/EXEC and the list is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
// 如果命令在一個事務中執行,那麼為了不產生死等待
// 伺服器只能向客戶端傳送一個空回覆
if (c->flags & REDIS_MULTI) {
addReply(c,shared.nullmultibulk);
return;
}
/* If the list is empty or the key does not exists we must block */
// 所有輸入列表鍵都不存在,只能阻塞了
blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}
blockForKeys:阻塞客戶端
void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;
// 設定阻塞狀態的超時和目標選項
c->bpop.timeout = timeout;
// target 在執行 RPOPLPUSH 命令時使用
c->bpop.target = target;
if (target != NULL) incrRefCount(target);
// 關聯阻塞客戶端和鍵的相關資訊
for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dict ignore it. */
// c->bpop.keys 是一個集合(值為 NULL 的字典)
// 它記錄所有造成客戶端阻塞的鍵
// 以下語句在鍵不存在於集合的時候,將它新增到集合
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
// c->db->blocking_keys 字典的鍵為造成客戶端阻塞的鍵
// 而值則是一個連結串列,連結串列中包含了所有被阻塞的客戶端
// 以下程式將阻塞鍵和被阻塞客戶端關聯起來
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
// 連結串列不存在,新建立一個,並將它關聯到字典中
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
redisAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
l = dictGetVal(de);
}
// 將客戶端填接到被阻塞客戶端的連結串列中
listAddNodeTail(l,c);
}
blockClient(c,REDIS_BLOCKED_LIST);
}
signalListAsReady:如果有客戶端因為這個鍵被阻塞,他會被放到ready_keys 中儲存
void signalListAsReady(redisClient *c, robj *key) {
readyList *rl;
/* No clients blocking for this key? No need to queue it. */
// 沒有客戶端被這個鍵阻塞,直接返回
if (dictFind(c->db->blocking_keys,key) == NULL) return;
/* Key was already signaled? No need to queue it again. */
// 這個鍵已經被新增到 ready_keys 中了,直接返回
if (dictFind(c->db->ready_keys,key) != NULL) return;
/* Ok, we need to queue this key into server.ready_keys. */
// 建立一個 readyList 結構,儲存鍵和資料庫
// 然後將 readyList 新增到 server.ready_keys 中
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = c->db;
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);
/* We also add the key in the db->ready_keys dictionary in order
* to avoid adding it multiple times into a list with a simple O(1)
* check.
*
* 將 key 新增到 c->db->ready_keys 集合中,防止重複新增
*/
incrRefCount(key);
redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
}
handleClientsBlockedOnLists: 對所有被阻塞在某個客戶端的 key 來說,只要這個 key 被執行了某種 PUSH 操作 那麼這個 key 就會被放到 serve.ready_keys 去。 這個函式會遍歷整個 serve.ready_keys 連結串列, 並將裡面的 key 的元素彈出給被阻塞客戶端, 從而解除客戶端的阻塞狀態。
void handleClientsBlockedOnLists(void) {
// 遍歷整個 ready_keys 連結串列
while(listLength(server.ready_keys) != 0) {
list *l;
/* Point server.ready_keys to a fresh list and save the current one
* locally. This way as we run the old list we are free to call
* signalListAsReady() that may push new elements in server.ready_keys
* when handling clients blocked into BRPOPLPUSH. */
// 備份舊的 ready_keys ,再給伺服器端賦值一個新的
l = server.ready_keys;
server.ready_keys = listCreate();
while(listLength(l) != 0) {
// 取出 ready_keys 中的首個連結串列節點
listNode *ln = listFirst(l);
// 指向 readyList 結構
readyList *rl = ln->value;
/* First of all remove this key from db->ready_keys so that
* we can safely call signalListAsReady() against this key. */
// 從 ready_keys 中移除就緒的 key
dictDelete(rl->db->ready_keys,rl->key);
/* If the key exists and it's a list, serve blocked clients
* with data. */
// 獲取鍵物件,這個物件應該是非空的,並且是列表
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL && o->type == REDIS_LIST) {
dictEntry *de;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
// 取出所有被這個 key 阻塞的客戶端
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
while(numclients--) {
// 取出客戶端
listNode *clientnode = listFirst(clients);
redisClient *receiver = clientnode->value;
// 設定彈出的目標物件(只在 BRPOPLPUSH 時使用)
robj *dstkey = receiver->bpop.target;
// 從列表中彈出元素
// 彈出的位置取決於是執行 BLPOP 還是 BRPOP 或者 BRPOPLPUSH
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == blpopCommand) ?
REDIS_HEAD : REDIS_TAIL;
robj *value = listTypePop(o,where);
// 還有元素可彈出(非 NULL)
if (value) {
/* Protect receiver->bpop.target, that will be
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);
// 取消客戶端的阻塞狀態
unblockClient(receiver);
// 將值 value 推入到造成客戶度 receiver 阻塞的 key 上
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
where) == REDIS_ERR)
{
/* If we failed serving the client we need
* to also undo the POP operation. */
listTypePush(o,value,where);
}
if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
// 如果執行到這裡,表示還有至少一個客戶端被鍵所阻塞
// 這些客戶端要等待對鍵的下次 PUSH
break;
}
}
}
// 如果列表元素已經為空,那麼從資料庫中將它刪除
if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
/* We don't call signalModifiedKey() as it was already called
* when an element was pushed on the list. */
}
/* Free this item. */
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l); /* We have the new list on place at this point. */
}
}