1. 程式人生 > >redis原始碼分析與思考(十四)——列表型別的命令實現(t_list.c)

redis原始碼分析與思考(十四)——列表型別的命令實現(t_list.c)

    列表型別是用來存貯多個字串物件的結構。一個列表可以存貯232-1個元素,可以對列表兩端進行插入(push)、彈出(pop),還可以獲取指定範圍內的元素列表、獲取指定索引的元素等等,它可以靈活的充當棧和佇列的角色。下面列出列表的命令:

列表命令

操作說明 命令 時間複雜度
向尾部新增 rpush key value [value…] O(n)
向頭部新增 lpush key value [value…] O(n)
插入在指定鍵前/後 linsert key before/after pivot value O(n)
返回範圍內的鍵 lrange key start end O(n)
返回索引的鍵 lindex key index O(n)
返回列表的長度 llen key index O(1)
彈出頭部節點 lpop key O(1)
彈出尾部節點 rpop key O(1)
刪除指定cout數目且值等於value的鍵 lremkey count value O(n)
刪除指定範圍的鍵 ltrim key start end O(n)
修改指定索引的鍵值 lset key index value O(n)
阻塞操作 blpop、brpop O(1)

編碼的轉換

    上次在介紹redis物件時提到過,列表物件的編碼格式會在一定情況下從ziplist轉變成adlist連結串列。列表每次push元素時,都會檢測是否滿足這一情況:

/*
 * 對輸入值 value 進行檢查,看是否需要將 subject 從 ziplist 轉換為雙端連結串列,
 * 以便儲存值 value 。
 * 函式只對 REDIS_ENCODING_RAW 編碼的 value 進行檢查,
 * 因為整數編碼的值不可能超長。
 */
 //server.list_max_ziplist_value是伺服器的屬性,可更改,預設64位元組
void listTypeTryConversion(robj *subject, robj *value) {
    // 確保 subject 為 ZIPLIST 編碼
    if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
    if (sdsEncodedObject(value) &&
        // 看字串是否過長
        sdslen(value->ptr) > server.list_max_ziplist_value)
            // 將編碼轉換為雙端連結串列
            listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
}

/*
 * 將列表的底層編碼從 ziplist 轉換成雙端連結串列
 */
void listTypeConvert(robj *subject, int enc) {
//定義一個連結串列的迭代器
    listTypeIterator *li;
    listTypeEntry entry;
    redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
    // 轉換成雙端連結串列
    if (enc == REDIS_ENCODING_LINKEDLIST) {
        list *l = listCreate();
        //設定連結串列的釋放函式
        listSetFreeMethod(l,decrRefCountVoid);
        /* listTypeGet returns a robj with incremented refcount */
        // 遍歷 ziplist ,並將裡面的值全部新增到雙端連結串列中
        li = listTypeInitIterator(subject,0,REDIS_TAIL);
        while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
        listTypeReleaseIterator(li);
        // 更新編碼
        subject->encoding = REDIS_ENCODING_LINKEDLIST;
        // 釋放原來的 ziplist
        zfree(subject->ptr);
        // 更新物件值指標
        subject->ptr = l;
    } else {
        redisPanic("Unsupported list conversion");
    }
}

    也就是說,當列表裡插入的元素位元組長大於64位元組時,列表會啟動編碼轉換程式。

列表的新增

    不僅是插入的元素位元組長大於預設的64位元組會激動編碼轉換,新增元素的底層實現,在插入之前會檢測列表的總長度是否大於預設ziplist最大長度也會引起編碼轉換,預設長度是512個元素,可以在伺服器的配置中更改,列表插入底層實現如下:

//server.list_max_ziplist_entries表示最大長度,where表示插入到表頭還是表尾
void listTypePush(robj *subject, robj *value, int where) {
    /* Check if we need to convert the ziplist */
    // 是否需要轉換編碼?
    listTypeTryConversion(subject,value);
    if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
        ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
            listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
    // ZIPLIST
    if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
        int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
        // 取出物件的值,因為 ZIPLIST 只能儲存字串或整數
        value = getDecodedObject(value);
        subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
        decrRefCount(value);
    // 雙端連結串列
    } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
        if (where == REDIS_HEAD) {
            listAddNodeHead(subject->ptr,value);
        } else {
            listAddNodeTail(subject->ptr,value);
        }
        incrRefCount(value);
    // 未知編碼
    } else {
        redisPanic("Unknown list encoding");
    }
}


/*
 * 將物件 value 插入到列表節點的之前或之後。
 * where 引數決定了插入的位置:
 *  - REDIS_HEAD 插入到節點之前
 *  - REDIS_TAIL 插入到節點之後
 */
void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
    robj *subject = entry->li->subject;
    // 插入到 ZIPLIST
    if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
        // 返回物件未編碼的值
        value = getDecodedObject(value);
        if (where == REDIS_TAIL) {
            unsigned char *next = ziplistNext(subject->ptr,entry->zi);
            /* When we insert after the current element, but the current element
             * is the tail of the list, we need to do a push. */
            if (next == NULL) {
                // next 是表尾節點,push 新節點到表尾
                subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
            } else {
                // 插入到到節點之後
                subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
            }
        } else {
            subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
        }
        decrRefCount(value);
    // 插入到雙端連結串列
    } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
        if (where == REDIS_TAIL) {
            listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
        } else {
            listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
        }
        incrRefCount(value);
    } else {
        redisPanic("Unknown list encoding");
    }
}

    PUSH系列命令的底層實現:

void pushGenericCommand(redisClient *c, int where) {
    int j, waiting = 0, pushed = 0;
    // 取出列表物件
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
    // 如果列表物件不存在,那麼可能有客戶端在等待這個鍵的出現
    int may_have_waiting_clients = (lobj == NULL);
    if (lobj && lobj->type != REDIS_LIST) {
        addReply(c,shared.wrongtypeerr);
        return;
    }
    // 將列表狀態設定為就緒
    if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
    // 遍歷所有輸入值,並將它們新增到列表中
    for (j = 2; j < c->argc; j++) {
        // 編碼值
        c->argv[j] = tryObjectEncoding(c->argv[j]);
        // 如果列表物件不存在,那麼建立一個,並關聯到資料庫
        if (!lobj) {
            lobj = createZiplistObject();
            dbAdd(c->db,c->argv[1],lobj);
        }
        // 將值推入到列表
        listTypePush(lobj,c->argv[j],where);
        pushed++;
    }
    // 返回新增的節點數量
    addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
    // 如果至少有一個元素被成功推入,那麼執行以下程式碼
    if (pushed) {
        char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";
        // 傳送鍵修改訊號
        signalModifiedKey(c->db,c->argv[1]);
        // 傳送事件通知
        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
    }
    server.dirty += pushed;
}

索引

    索引的策略很簡單,判斷是哪種編碼格式呼叫哪種遍歷:

void lindexCommand(redisClient *c) {
    robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
    long index;
    robj *value = NULL;
    // 取出整數值物件 index
    if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
        return;
    // 根據索引,遍歷 ziplist ,直到指定位置
    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *p;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;
        p = ziplistIndex(o->ptr,index);
        if (ziplistGet(p,&vstr,&vlen,&vlong)) {
            if (vstr) {
                value = createStringObject((char*)vstr,vlen);
            } else {
                value = createStringObjectFromLongLong(vlong);
            }
            addReplyBulk(c,value);
            decrRefCount(value);
        } else {
            addReply(c,shared.nullbulk);
        }
    // 根據索引,遍歷雙端連結串列,直到指定位置
    } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
        listNode *ln = listIndex(o->ptr,index);
        if (ln != NULL) {
            value = listNodeValue(ln);
            addReplyBulk(c,value);
        } else {
            addReply(c,shared.nullbulk);
        }
    } else {
        redisPanic("Unknown list encoding");
    }
}

pop操作

    pop操作其實就是將佇列的pop操作而已,彈出元素後如果列表為空,則刪除該列表:

void popGenericCommand(redisClient *c, int where) {
    // 取出列表物件
    robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
    if (o == NULL || checkType(c,o,REDIS_LIST)) return;
    // 彈出列表元素
    robj *value = listTypePop(o,where);
    // 根據彈出元素是否為空,決定後續動作
    if (value == NULL) {
        addReply(c,shared.nullbulk);
    } else {
        char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
//回覆給客戶端,彈出的value
        addReplyBulk(c,value);
        //引用計數減1
        decrRefCount(value);
        notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
        if (listTypeLength(o) == 0) {
            notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                                c->argv[1],c->db->id);
            dbDelete(c->db,c->argv[1]);
        }
        signalModifiedKey(c->db,c->argv[1]);
        server.dirty++;
    }
}

裁剪操作

//LTRIM命令
void ltrimCommand(redisClient *c) {
    robj *o;
    long start, end, llen, j, ltrim, rtrim;
    list *list;
    listNode *ln;
    // 取出索引值 start 和 end
    if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
        (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
    // 取出列表物件
    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
        checkType(c,o,REDIS_LIST)) return;
    // 列表長度
    llen = listTypeLength(o);
    /* convert negative indexes */
    // 將負數索引轉換成正數索引
    if (start < 0) start = llen+start;
    if (end < 0) end = llen+end;
    if (start < 0) start = 0;
    /* Invariant: start >= 0, so this test will be true when end < 0.
     * The range is empty when start > end or start >= length. */
    if (start > end || start >= llen) {
        /* Out of range start or start > end result in empty list */
        ltrim = llen;
        rtrim = 0;
    } else {
        if (end >= llen) end = llen-1;
        ltrim = start;
        rtrim = llen-end-1;
    }
    /* Remove list elements to perform the trim */
    // 刪除指定列表兩端的元素
    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        // 刪除左端元素
        o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
        // 刪除右端元素
        o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
    } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
        list = o->ptr;
        // 刪除左端元素
        for (j = 0; j < ltrim; j++) {
            ln = listFirst(list);
            listDelNode(list,ln);
        }
        // 刪除右端元素
        for (j = 0; j < rtrim; j++) {
            ln = listLast(list);
            listDelNode(list,ln);
        }
    } else {
        redisPanic("Unknown list encoding");
    }
    // 傳送通知
    notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
    // 如果列表已經為空,那麼刪除它
    if (listTypeLength(o) == 0) {
        dbDelete(c->db,c->argv[1]);
        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
    }
    signalModifiedKey(c->db,c->argv[1]);
    server.dirty++;
    addReply(c,shared.ok);
}

移除列表中指定的值

void lremCommand(redisClient *c) {
    robj *subject, *obj;
    // 編碼目標物件 elem
    obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
    long toremove;
    long removed = 0;
    listTypeEntry entry;
    // 取出指定刪除模式的 count 引數
    if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
        return;
    // 取出列表物件
    subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
    if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
    /* Make sure obj is raw when we're dealing with a ziplist */
    if (subject->encoding == REDIS_ENCODING_ZIPLIST)