redis原始碼分析與思考(十四)——列表型別的命令實現(t_list.c)
阿新 • • 發佈:2018-11-12
列表型別是用來存貯多個字串物件的結構。一個列表可以存貯232-1個元素,可以對列表兩端進行插入(push)、彈出(pop),還可以獲取指定範圍內的元素列表、獲取指定索引的元素等等,它可以靈活的充當棧和佇列的角色。下面列出列表的命令:
列表命令
操作說明 | 命令 | 時間複雜度 |
---|---|---|
向尾部新增 | rpush key value [value…] | O(n) |
向頭部新增 | lpush key value [value…] | O(n) |
插入在指定鍵前/後 | linsert key before/after pivot value | O(n) |
返回範圍內的鍵 | lrange key start end | O(n) |
返回索引的鍵 | lindex key index | O(n) |
返回列表的長度 | llen key index | O(1) |
彈出頭部節點 | lpop key | O(1) |
彈出尾部節點 | rpop key | O(1) |
刪除指定cout數目且值等於value的鍵 | lremkey count value | O(n) |
刪除指定範圍的鍵 | ltrim key start end | O(n) |
修改指定索引的鍵值 | lset key index value | O(n) |
阻塞操作 | blpop、brpop | O(1) |
編碼的轉換
上次在介紹redis物件時提到過,列表物件的編碼格式會在一定情況下從ziplist轉變成adlist連結串列。列表每次push元素時,都會檢測是否滿足這一情況:
/*
* 對輸入值 value 進行檢查,看是否需要將 subject 從 ziplist 轉換為雙端連結串列,
* 以便儲存值 value 。
* 函式只對 REDIS_ENCODING_RAW 編碼的 value 進行檢查,
* 因為整數編碼的值不可能超長。
*/
//server.list_max_ziplist_value是伺服器的屬性,可更改,預設64位元組
void listTypeTryConversion(robj *subject, robj *value) {
// 確保 subject 為 ZIPLIST 編碼
if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
if (sdsEncodedObject(value) &&
// 看字串是否過長
sdslen(value->ptr) > server.list_max_ziplist_value)
// 將編碼轉換為雙端連結串列
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
}
/*
* 將列表的底層編碼從 ziplist 轉換成雙端連結串列
*/
void listTypeConvert(robj *subject, int enc) {
//定義一個連結串列的迭代器
listTypeIterator *li;
listTypeEntry entry;
redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
// 轉換成雙端連結串列
if (enc == REDIS_ENCODING_LINKEDLIST) {
list *l = listCreate();
//設定連結串列的釋放函式
listSetFreeMethod(l,decrRefCountVoid);
/* listTypeGet returns a robj with incremented refcount */
// 遍歷 ziplist ,並將裡面的值全部新增到雙端連結串列中
li = listTypeInitIterator(subject,0,REDIS_TAIL);
while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
listTypeReleaseIterator(li);
// 更新編碼
subject->encoding = REDIS_ENCODING_LINKEDLIST;
// 釋放原來的 ziplist
zfree(subject->ptr);
// 更新物件值指標
subject->ptr = l;
} else {
redisPanic("Unsupported list conversion");
}
}
也就是說,當列表裡插入的元素位元組長大於64位元組時,列表會啟動編碼轉換程式。
列表的新增
不僅是插入的元素位元組長大於預設的64位元組會激動編碼轉換,新增元素的底層實現,在插入之前會檢測列表的總長度是否大於預設ziplist最大長度也會引起編碼轉換,預設長度是512個元素,可以在伺服器的配置中更改,列表插入底層實現如下:
//server.list_max_ziplist_entries表示最大長度,where表示插入到表頭還是表尾
void listTypePush(robj *subject, robj *value, int where) {
/* Check if we need to convert the ziplist */
// 是否需要轉換編碼?
listTypeTryConversion(subject,value);
if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
// ZIPLIST
if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
// 取出物件的值,因為 ZIPLIST 只能儲存字串或整數
value = getDecodedObject(value);
subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
decrRefCount(value);
// 雙端連結串列
} else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
if (where == REDIS_HEAD) {
listAddNodeHead(subject->ptr,value);
} else {
listAddNodeTail(subject->ptr,value);
}
incrRefCount(value);
// 未知編碼
} else {
redisPanic("Unknown list encoding");
}
}
/*
* 將物件 value 插入到列表節點的之前或之後。
* where 引數決定了插入的位置:
* - REDIS_HEAD 插入到節點之前
* - REDIS_TAIL 插入到節點之後
*/
void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
robj *subject = entry->li->subject;
// 插入到 ZIPLIST
if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
// 返回物件未編碼的值
value = getDecodedObject(value);
if (where == REDIS_TAIL) {
unsigned char *next = ziplistNext(subject->ptr,entry->zi);
/* When we insert after the current element, but the current element
* is the tail of the list, we need to do a push. */
if (next == NULL) {
// next 是表尾節點,push 新節點到表尾
subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
} else {
// 插入到到節點之後
subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
}
} else {
subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
}
decrRefCount(value);
// 插入到雙端連結串列
} else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
if (where == REDIS_TAIL) {
listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
} else {
listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
}
incrRefCount(value);
} else {
redisPanic("Unknown list encoding");
}
}
PUSH系列命令的底層實現:
void pushGenericCommand(redisClient *c, int where) {
int j, waiting = 0, pushed = 0;
// 取出列表物件
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
// 如果列表物件不存在,那麼可能有客戶端在等待這個鍵的出現
int may_have_waiting_clients = (lobj == NULL);
if (lobj && lobj->type != REDIS_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
// 將列表狀態設定為就緒
if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
// 遍歷所有輸入值,並將它們新增到列表中
for (j = 2; j < c->argc; j++) {
// 編碼值
c->argv[j] = tryObjectEncoding(c->argv[j]);
// 如果列表物件不存在,那麼建立一個,並關聯到資料庫
if (!lobj) {
lobj = createZiplistObject();
dbAdd(c->db,c->argv[1],lobj);
}
// 將值推入到列表
listTypePush(lobj,c->argv[j],where);
pushed++;
}
// 返回新增的節點數量
addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
// 如果至少有一個元素被成功推入,那麼執行以下程式碼
if (pushed) {
char *event = (where == REDIS_HEAD) ? "lpush" : "rpush";
// 傳送鍵修改訊號
signalModifiedKey(c->db,c->argv[1]);
// 傳送事件通知
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
}
server.dirty += pushed;
}
索引
索引的策略很簡單,判斷是哪種編碼格式呼叫哪種遍歷:
void lindexCommand(redisClient *c) {
robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
if (o == NULL || checkType(c,o,REDIS_LIST)) return;
long index;
robj *value = NULL;
// 取出整數值物件 index
if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
return;
// 根據索引,遍歷 ziplist ,直到指定位置
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *p;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
p = ziplistIndex(o->ptr,index);
if (ziplistGet(p,&vstr,&vlen,&vlong)) {
if (vstr) {
value = createStringObject((char*)vstr,vlen);
} else {
value = createStringObjectFromLongLong(vlong);
}
addReplyBulk(c,value);
decrRefCount(value);
} else {
addReply(c,shared.nullbulk);
}
// 根據索引,遍歷雙端連結串列,直到指定位置
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
listNode *ln = listIndex(o->ptr,index);
if (ln != NULL) {
value = listNodeValue(ln);
addReplyBulk(c,value);
} else {
addReply(c,shared.nullbulk);
}
} else {
redisPanic("Unknown list encoding");
}
}
pop操作
pop操作其實就是將佇列的pop操作而已,彈出元素後如果列表為空,則刪除該列表:
void popGenericCommand(redisClient *c, int where) {
// 取出列表物件
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
if (o == NULL || checkType(c,o,REDIS_LIST)) return;
// 彈出列表元素
robj *value = listTypePop(o,where);
// 根據彈出元素是否為空,決定後續動作
if (value == NULL) {
addReply(c,shared.nullbulk);
} else {
char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
//回覆給客戶端,彈出的value
addReplyBulk(c,value);
//引用計數減1
decrRefCount(value);
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,c->argv[1],c->db->id);
if (listTypeLength(o) == 0) {
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
c->argv[1],c->db->id);
dbDelete(c->db,c->argv[1]);
}
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
}
}
裁剪操作
//LTRIM命令
void ltrimCommand(redisClient *c) {
robj *o;
long start, end, llen, j, ltrim, rtrim;
list *list;
listNode *ln;
// 取出索引值 start 和 end
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
// 取出列表物件
if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
checkType(c,o,REDIS_LIST)) return;
// 列表長度
llen = listTypeLength(o);
/* convert negative indexes */
// 將負數索引轉換成正數索引
if (start < 0) start = llen+start;
if (end < 0) end = llen+end;
if (start < 0) start = 0;
/* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length. */
if (start > end || start >= llen) {
/* Out of range start or start > end result in empty list */
ltrim = llen;
rtrim = 0;
} else {
if (end >= llen) end = llen-1;
ltrim = start;
rtrim = llen-end-1;
}
/* Remove list elements to perform the trim */
// 刪除指定列表兩端的元素
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
// 刪除左端元素
o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
// 刪除右端元素
o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
} else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
list = o->ptr;
// 刪除左端元素
for (j = 0; j < ltrim; j++) {
ln = listFirst(list);
listDelNode(list,ln);
}
// 刪除右端元素
for (j = 0; j < rtrim; j++) {
ln = listLast(list);
listDelNode(list,ln);
}
} else {
redisPanic("Unknown list encoding");
}
// 傳送通知
notifyKeyspaceEvent(REDIS_NOTIFY_LIST,"ltrim",c->argv[1],c->db->id);
// 如果列表已經為空,那麼刪除它
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[1]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
}
signalModifiedKey(c->db,c->argv[1]);
server.dirty++;
addReply(c,shared.ok);
}
移除列表中指定的值
void lremCommand(redisClient *c) {
robj *subject, *obj;
// 編碼目標物件 elem
obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
long toremove;
long removed = 0;
listTypeEntry entry;
// 取出指定刪除模式的 count 引數
if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
return;
// 取出列表物件
subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
/* Make sure obj is raw when we're dealing with a ziplist */
if (subject->encoding == REDIS_ENCODING_ZIPLIST)