1. 程式人生 > >Redis之列表物件原始碼閱讀

Redis之列表物件原始碼閱讀

listTypeTryConversion:嘗試將列表物件轉換成linkedlist編碼

void listTypeTryConversion(robj *subject, robj *value) {

    // 確保 subject 為 ZIPLIST 編碼
    if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;

    if (sdsEncodedObject(value) &&
        // 看字串是否過長
        sdslen(value->ptr) > server.list_max_ziplist_value)
            // 將編碼轉換為雙端連結串列
            listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
}

listTypePush:將value新增到列表物件的頭部或尾部

void listTypePush(robj *subject, robj *value, int where) {

    /* Check if we need to convert the ziplist */
    // 是否需要轉換編碼?
    listTypeTryConversion(subject,value);

    if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
        ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
            listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);

    // ZIPLIST
    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
        int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
        // 取出物件的值,因為 ZIPLIST 只能儲存字串或整數
        value = getDecodedObject(value);
        subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
        decrRefCount(value);

    // 雙端連結串列
    } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
        if (where == REDIS_HEAD) {
            listAddNodeHead(subject->ptr,value);
        } else {
            listAddNodeTail(subject->ptr,value);
        }
        incrRefCount(value);

    // 未知編碼
    } else {
        redisPanic("Unknown list encoding");
    }
}

listTypePop:彈出一個物件

robj *listTypePop(robj *subject, int where) {

    robj *value = NULL;

    // ZIPLIST
    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *p;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;

        // 決定彈出元素的位置
        int pos = (where == REDIS_HEAD) ? 0 : -1;

        p = ziplistIndex(subject->ptr,pos);
        if (ziplistGet(p,&vstr,&vlen,&vlong)) {
            // 為被彈出元素建立物件
            if (vstr) {
                value = createStringObject((char*)vstr,vlen);
            } else {
                value = createStringObjectFromLongLong(vlong);
            }
            /* We only need to delete an element when it exists */
            // 從 ziplist 中刪除被彈出元素
            subject->ptr = ziplistDelete(subject->ptr,&p);
        }

    // 雙端連結串列
    } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {

        list *list = subject->ptr;

        listNode *ln;

        if (where == REDIS_HEAD) {
            ln = listFirst(list);
        } else {
            ln = listLast(list);
        }

        // 刪除被彈出節點
        if (ln != NULL) {
            value = listNodeValue(ln);
            incrRefCount(value);
            listDelNode(list,ln);
        }

    // 未知編碼
    } else {
        redisPanic("Unknown list encoding");
    }

    // 返回節點物件
    return value;
}

listTypeLength:返回節點的數量

unsigned long listTypeLength(robj *subject) {

    // ZIPLIST
    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
        return ziplistLen(subject->ptr);

    // 雙端連結串列
    } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
        return listLength((list*)subject->ptr);

    // 未知編碼
    } else {
        redisPanic("Unknown list encoding");
    }
}

pushGenericCommand:根據where將value新增到頭部或尾部

void pushGenericCommand(redisClient *c, int where) {

    int j, waiting = 0, pushed = 0;

    // 取出列表物件
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);

    // 如果列表物件不存在,那麼可能有客戶端在等待這個鍵的出現
    int may_have_waiting_clients = (lobj == NULL);

    if (lobj && lobj->type != REDIS_LIST) {
        addReply(c,shared.wrongtypeerr);
        return;
    }

    // 將列表狀態設定為就緒
    if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);

    // 遍歷所有輸入值,並將它們新增到列表中
    for (j = 2; j < c->argc; j++) {

        // 編碼值
        c->argv[j] = tryObjectEncoding(c->argv[j]);

        // 如果列表物件不存在,那麼建立一個,並關聯到資料庫
        if (!lobj) {
            lobj = createZiplistObject();
            dbAdd(c->db,c->argv[1],lobj);
        }

        // 將值推入到列表
        listTypePush(lobj,c->argv[j],where);

        pushed++;
    }

    // 返回新增的節點數量
    addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));

    // 如果至少有一個元素被成功推入,那麼執行以下程式碼
    if (pushed) {
        char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";

        // 傳送鍵修改訊號
        signalModifiedKey(c->db,c->argv[1]);

        // 傳送事件通知
        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
    }

    server.dirty += pushed;
}
void lpushCommand(redisClient *c) {
    pushGenericCommand(c,REDIS_HEAD);
}

void rpushCommand(redisClient *c) {
    pushGenericCommand(c,REDIS_TAIL);
}

pushxGenericCommand:將值插入到某個值的前面或後面

void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
    robj *subject;
    listTypeIterator *iter;
    listTypeEntry entry;
    int inserted = 0;

    // 取出列表物件
    if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
        checkType(c,subject,REDIS_LIST)) return;

    // 執行的是 LINSERT 命令
    if (refval != NULL) {
        // 看儲存值 value 是否需要將列表編碼轉換為雙端連結串列
        listTypeTryConversion(subject,val);

        /* Seek refval from head to tail */
        // 在列表中查詢 refval 物件
        iter = listTypeInitIterator(subject,0,REDIS_TAIL);
        while (listTypeNext(iter,&entry)) {
            if (listTypeEqual(&entry,refval)) {
                // 找到了,將值插入到節點的前面或後面
                listTypeInsert(&entry,val,where);
                inserted = 1;
                break;
            }
        }
        listTypeReleaseIterator(iter);

        if (inserted) {
            /* Check if the length exceeds the ziplist length threshold. */
            // 檢視插入之後是否需要將編碼轉換為雙端連結串列
            if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
                ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
                    listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);

            signalModifiedKey(c->db,c->argv[1]);

            notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"linsert",
                                c->argv[1],c->db->id);
            server.dirty++;
        } else {
            /* Notify client of a failed insert */
            // refval 不存在,插入失敗
            addReply(c,shared.cnegone);
            return;
        }

    // 執行的是 LPUSHX 或 RPUSHX 命令
    } else {
        char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";

        listTypePush(subject,val,where);

        signalModifiedKey(c->db,c->argv[1]);

        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);

        server.dirty++;
    }

    addReplyLongLong(c,listTypeLength(subject));
}

blpopCommand:阻塞彈出命令

void blpopCommand(redisClient *c) {
    blockingPopGenericCommand(c,REDIS_HEAD);
}

blockingPopGenericCommand:遍歷所有輸入的列表鍵,只要有一個列表鍵可以pop,就返回,否則如果都不能pop,就阻塞

void blockingPopGenericCommand(redisClient *c, int where) {
    robj *o;
    mstime_t timeout;
    int j;

    // 取出 timeout 引數
    if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
        != REDIS_OK) return;

    // 遍歷所有列表鍵
    for (j = 1; j < c->argc-1; j++) {

        // 取出列表鍵
        o = lookupKeyWrite(c->db,c->argv[j]);

        // 有非空列表?
        if (o != NULL) {
            if (o->type != REDIS_LIST) {
                addReply(c,shared.wrongtypeerr);
                return;
            } else {
                // 非空列表
                if (listTypeLength(o) != 0) {
                    /* Non empty list, this is like a non normal [LR]POP. */
                    char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";

                    // 彈出值
                    robj *value = listTypePop(o,where);

                    redisAssert(value != NULL);

                    // 回覆客戶端
                    addReplyMultiBulkLen(c,2);
                    // 回覆彈出元素的列表
                    addReplyBulk(c,c->argv[j]);
                    // 回覆彈出值
                    addReplyBulk(c,value);

                    decrRefCount(value);

                    notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
                                        c->argv[j],c->db->id);

                    // 刪除空列表
                    if (listTypeLength(o) == 0) {
                        dbDelete(c->db,c->argv[j]);
                        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                                            c->argv[j],c->db->id);
                    }

                    signalModifiedKey(c->db,c->argv[j]);

                    server.dirty++;

                    /* Replicate it as an [LR]POP instead of B[LR]POP. */
                    // 傳播一個 [LR]POP 而不是 B[LR]POP
                    rewriteClientCommandVector(c,2,
                        (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
                        c->argv[j]);
                    return;
                }
            }
        }
    }

    /* If we are inside a MULTI/EXEC and the list is empty the only thing
     * we can do is treating it as a timeout (even with timeout 0). */
    // 如果命令在一個事務中執行,那麼為了不產生死等待
    // 伺服器只能向客戶端傳送一個空回覆
    if (c->flags & REDIS_MULTI) {
        addReply(c,shared.nullmultibulk);
        return;
    }

    /* If the list is empty or the key does not exists we must block */
    // 所有輸入列表鍵都不存在,只能阻塞了
    blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}

blockForKeys:阻塞客戶端

void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
    dictEntry *de;
    list *l;
    int j;

    // 設定阻塞狀態的超時和目標選項
    c->bpop.timeout = timeout;

    // target 在執行 RPOPLPUSH 命令時使用
    c->bpop.target = target;

    if (target != NULL) incrRefCount(target);

    // 關聯阻塞客戶端和鍵的相關資訊
    for (j = 0; j < numkeys; j++) {

        /* If the key already exists in the dict ignore it. */
        // c->bpop.keys 是一個集合(值為 NULL 的字典)
        // 它記錄所有造成客戶端阻塞的鍵
        // 以下語句在鍵不存在於集合的時候,將它新增到集合
        if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;

        incrRefCount(keys[j]);

        /* And in the other "side", to map keys -> clients */
        // c->db->blocking_keys 字典的鍵為造成客戶端阻塞的鍵
        // 而值則是一個連結串列,連結串列中包含了所有被阻塞的客戶端
        // 以下程式將阻塞鍵和被阻塞客戶端關聯起來
        de = dictFind(c->db->blocking_keys,keys[j]);
        if (de == NULL) {
            // 連結串列不存在,新建立一個,並將它關聯到字典中
            int retval;

            /* For every key we take a list of clients blocked for it */
            l = listCreate();
            retval = dictAdd(c->db->blocking_keys,keys[j],l);
            incrRefCount(keys[j]);
            redisAssertWithInfo(c,keys[j],retval == DICT_OK);
        } else {
            l = dictGetVal(de);
        }
        // 將客戶端填接到被阻塞客戶端的連結串列中
        listAddNodeTail(l,c);
    }
    blockClient(c,REDIS_BLOCKED_LIST);
}

signalListAsReady:如果有客戶端因為這個鍵被阻塞,他會被放到ready_keys 中儲存

void signalListAsReady(redisClient *c, robj *key) {
    readyList *rl;

    /* No clients blocking for this key? No need to queue it. */
    // 沒有客戶端被這個鍵阻塞,直接返回
    if (dictFind(c->db->blocking_keys,key) == NULL) return;

    /* Key was already signaled? No need to queue it again. */
    // 這個鍵已經被新增到 ready_keys 中了,直接返回
    if (dictFind(c->db->ready_keys,key) != NULL) return;

    /* Ok, we need to queue this key into server.ready_keys. */
    // 建立一個 readyList 結構,儲存鍵和資料庫
    // 然後將 readyList 新增到 server.ready_keys 中
    rl = zmalloc(sizeof(*rl));
    rl->key = key;
    rl->db = c->db;
    incrRefCount(key);
    listAddNodeTail(server.ready_keys,rl);

    /* We also add the key in the db->ready_keys dictionary in order
     * to avoid adding it multiple times into a list with a simple O(1)
     * check. 
     *
     * 將 key 新增到 c->db->ready_keys 集合中,防止重複新增
     */
    incrRefCount(key);
    redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
}

handleClientsBlockedOnLists: 對所有被阻塞在某個客戶端的 key 來說,只要這個 key 被執行了某種 PUSH 操作 那麼這個 key 就會被放到 serve.ready_keys 去。 這個函式會遍歷整個 serve.ready_keys 連結串列, 並將裡面的 key 的元素彈出給被阻塞客戶端, 從而解除客戶端的阻塞狀態。

void handleClientsBlockedOnLists(void) {

    // 遍歷整個 ready_keys 連結串列
    while(listLength(server.ready_keys) != 0) {
        list *l;

        /* Point server.ready_keys to a fresh list and save the current one
         * locally. This way as we run the old list we are free to call
         * signalListAsReady() that may push new elements in server.ready_keys
         * when handling clients blocked into BRPOPLPUSH. */
        // 備份舊的 ready_keys ,再給伺服器端賦值一個新的
        l = server.ready_keys;
        server.ready_keys = listCreate();

        while(listLength(l) != 0) {

            // 取出 ready_keys 中的首個連結串列節點
            listNode *ln = listFirst(l);

            // 指向 readyList 結構
            readyList *rl = ln->value;

            /* First of all remove this key from db->ready_keys so that
             * we can safely call signalListAsReady() against this key. */
            // 從 ready_keys 中移除就緒的 key
            dictDelete(rl->db->ready_keys,rl->key);

            /* If the key exists and it's a list, serve blocked clients
             * with data. */
            // 獲取鍵物件,這個物件應該是非空的,並且是列表
            robj *o = lookupKeyWrite(rl->db,rl->key);
            if (o != NULL && o->type == REDIS_LIST) {
                dictEntry *de;

                /* We serve clients in the same order they blocked for
                 * this key, from the first blocked to the last. */
                // 取出所有被這個 key 阻塞的客戶端
                de = dictFind(rl->db->blocking_keys,rl->key);
                if (de) {
                    list *clients = dictGetVal(de);
                    int numclients = listLength(clients);

                    while(numclients--) {
                        // 取出客戶端
                        listNode *clientnode = listFirst(clients);
                        redisClient *receiver = clientnode->value;

                        // 設定彈出的目標物件(只在 BRPOPLPUSH 時使用)
                        robj *dstkey = receiver->bpop.target;

                        // 從列表中彈出元素
                        // 彈出的位置取決於是執行 BLPOP 還是 BRPOP 或者 BRPOPLPUSH
                        int where = (receiver->lastcmd &&
                                     receiver->lastcmd->proc == blpopCommand) ?
                                    REDIS_HEAD : REDIS_TAIL;
                        robj *value = listTypePop(o,where);

                        // 還有元素可彈出(非 NULL)
                        if (value) {
                            /* Protect receiver->bpop.target, that will be
                             * freed by the next unblockClient()
                             * call. */
                            if (dstkey) incrRefCount(dstkey);

                            // 取消客戶端的阻塞狀態
                            unblockClient(receiver);

                            // 將值 value 推入到造成客戶度 receiver 阻塞的 key 上
                            if (serveClientBlockedOnList(receiver,
                                rl->key,dstkey,rl->db,value,
                                where) == REDIS_ERR)
                            {
                                /* If we failed serving the client we need
                                 * to also undo the POP operation. */
                                    listTypePush(o,value,where);
                            }

                            if (dstkey) decrRefCount(dstkey);
                            decrRefCount(value);
                        } else {
                            // 如果執行到這裡,表示還有至少一個客戶端被鍵所阻塞
                            // 這些客戶端要等待對鍵的下次 PUSH
                            break;
                        }
                    }
                }
                
                // 如果列表元素已經為空,那麼從資料庫中將它刪除
                if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
                /* We don't call signalModifiedKey() as it was already called
                 * when an element was pushed on the list. */
            }

            /* Free this item. */
            decrRefCount(rl->key);
            zfree(rl);
            listDelNode(l,ln);
        }
        listRelease(l); /* We have the new list on place at this point. */
    }
}