Redis(六):list/lpush/lrange/lpop 命令原始碼解析
上一篇講了hash資料型別的相關實現方法,沒有茅塞頓開也至少知道redis如何搞事情的了吧。
本篇咱們繼續來看redis中的資料型別的實現: list 相關操作實現。
同樣,我們以使用者的角度,開始理解list提供的功能,相應的資料結構承載,再到具體實現,以這樣一個思路來理解redis之list。
零、redis list相關操作方法
從官方的手冊中可以查到相關的使用方法。
1> BLPOP key1 [key2] timeout
功能: 移出並獲取列表的第一個元素, 如果列表沒有元素會阻塞列表直到等待超時或發現可彈出元素為止。(LPOP的阻塞版本)
返回值: 獲取到元素的key和被彈出的元素值2> BRPOP key1 [key2 ] timeout
功能: 移出並獲取列表的最後一個元素, 如果列表沒有元素會阻塞列表直到等待超時或發現可彈出元素為止。(RPOP 的阻塞版本)
返回值: 獲取到元素的key和被彈出的元素值3> BRPOPLPUSH source destination timeout
功能: 從列表中彈出一個值,將彈出的元素插入到另外一個列表中並返回它; 如果列表沒有元素會阻塞列表直到等待超時或發現可彈出元素為止。(RPOPLPUSH 的阻塞版本)
返回值: 被轉移的元素值或者為nil4> LINDEX key index
功能: 通過索引獲取列表中的元素
返回值: 查詢到的元素值,超出範圍時返回nil5> LINSERT key BEFORE|AFTER pivot value
功能: 在列表的元素前或者後插入元素
返回值: 插入後的list長度6> LLEN key
功能: 獲取列表長度
返回值: 列表長度7> LPOP key
功能: 移出並獲取列表的第一個元素
返回值: 第一個元素或者nil8> LPUSH key value1 [value2]
功能: 將一個或多個值插入到列表頭部
返回值: 插入後的list長度9> LPUSHX key value
將一個值插入到已存在的列表頭部,如果key不存在則不做任何操作
返回值: 插入後的list長度10> LRANGE key start stop
功能: 獲取列表指定範圍內的元素 (包含起止邊界)
返回值: 值列表11> LREM key count value
功能: 移除列表元素, count>0:移除正向匹配的count個元素,count<0:移除逆向匹配的count個元素, count=0,只移除匹配的元素
返回值: 移除的元素個數12> LSET key index value
功能: 通過索引設定列表元素的值
返回值: OK or err13> LTRIM key start stop
功能: 對一個列表進行修剪(trim),就是說,讓列表只保留指定區間內的元素,不在指定區間之內的元素都將被刪除。
返回值: OK14> RPOP key
功能: 移除列表的最後一個元素,返回值為移除的元素。
返回值: 最後一個元素值或者nil15> RPOPLPUSH source destination
功能: 移除列表的最後一個元素,並將該元素新增到另一個列表並返回
返回值: 被轉移的元素16> RPUSH key value1 [value2]
功能: 在列表中新增一個或多個值
返回值: 插入後的list長度17> RPUSHX key value
功能: 為已存在的列表新增值
返回值: 插入後的list長度
redis中的實現方法定義如下:
{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0}, {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0}, {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0}, {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0}, {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0}, {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0}, {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0}, {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0}, {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0}, {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0}, {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0}, {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0}, {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0}, {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
一、list相關資料結構
說到list或者說連結串列,我們能想到什麼資料結構呢?單向連結串列、雙向連結串列、迴圈連結串列... 好像都挺簡單的,還有啥?? 我們來看下redis 的實現:
// quicklist 是其實資料容器,由head,tail 進行迭代,所以算是一個雙向連結串列 /* quicklist is a 32 byte struct (on 64-bit systems) describing a quicklist. * 'count' is the number of total entries. * 'len' is the number of quicklist nodes. * 'compress' is: -1 if compression disabled, otherwise it's the number * of quicklistNodes to leave uncompressed at ends of quicklist. * 'fill' is the user-requested (or default) fill factor. */ typedef struct quicklist { // 頭節點 quicklistNode *head; // 尾節點 quicklistNode *tail; // 現有元素個數 unsigned long count; /* total count of all entries in all ziplists */ // 現有的 quicklistNode 個數,一個 node 可能包含n個元素 unsigned int len; /* number of quicklistNodes */ // 填充因子 int fill : 16; /* fill factor for individual nodes */ // 多深的連結串列無需壓縮 unsigned int compress : 16; /* depth of end nodes not to compress;0=off */ } quicklist; // 連結串列中的每個節點 typedef struct quicklistEntry { const quicklist *quicklist; quicklistNode *node; // 當前迭代元素的ziplist的偏移位置指標 unsigned char *zi; // 純粹的 value, 值來源 zi unsigned char *value; // 佔用空間大小 unsigned int sz; long long longval; // 當前節點偏移 int offset; } quicklistEntry; // 連結串列元素節點使用 quicklistNode /* quicklistNode is a 32 byte struct describing a ziplist for a quicklist. * We use bit fields keep the quicklistNode at 32 bytes. * count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k). * encoding: 2 bits, RAW=1, LZF=2. * container: 2 bits, NONE=1, ZIPLIST=2. * recompress: 1 bit, bool, true if node is temporarry decompressed for usage. * attempted_compress: 1 bit, boolean, used for verifying during testing. * extra: 12 bits, free for future use; pads out the remainder of 32 bits */ typedef struct quicklistNode { struct quicklistNode *prev; struct quicklistNode *next; // zl 為ziplist連結串列,儲存count個元素值 unsigned char *zl; unsigned int sz; /* ziplist size in bytes */ unsigned int count : 16; /* count of items in ziplist */ unsigned int encoding : 2; /* RAW==1 or LZF==2 */ unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ unsigned int recompress : 1; /* was this node previous compressed? */ unsigned int attempted_compress : 1; /* node can't compress; too small */ unsigned int extra : 10; /* more bits to steal for future usage */ } quicklistNode; // list迭代器 typedef struct quicklistIter { const quicklist *quicklist; quicklistNode *current; unsigned char *zi; long offset; /* offset in current ziplist */ int direction; } quicklistIter; // ziplist 資料結構 typedef struct zlentry { unsigned int prevrawlensize, prevrawlen; unsigned int lensize, len; unsigned int headersize; unsigned char encoding; unsigned char *p; } zlentry;
二、rpush/lpush 新增元素操作實現
rpush是所尾部新增元素,lpush是從頭部新增元素,本質上都是一樣的,redis實際上也是完全複用一套程式碼。
// t_list.c, lpush void lpushCommand(client *c) { // 使用 LIST_HEAD|LIST_TAIL 作為插入位置標識 pushGenericCommand(c,LIST_HEAD); } void rpushCommand(client *c) { pushGenericCommand(c,LIST_TAIL); } // t_list.c, 實際的push操作 void pushGenericCommand(client *c, int where) { int j, waiting = 0, pushed = 0; // 在db中查詢對應的key例項,查到或者查不到 robj *lobj = lookupKeyWrite(c->db,c->argv[1]); // 查到的情況下,需要驗證資料型別 if (lobj && lobj->type != OBJ_LIST) { addReply(c,shared.wrongtypeerr); return; } for (j = 2; j < c->argc; j++) { c->argv[j] = tryObjectEncoding(c->argv[j]); if (!lobj) { // 1. 在沒有key例項的情況下,先建立key例項到db中 lobj = createQuicklistObject(); // 2. 設定 fill和depth 引數 // fill 預設: -2 // depth 預設: 0 quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size, server.list_compress_depth); dbAdd(c->db,c->argv[1],lobj); } // 3. 一個個元素新增進去 listTypePush(lobj,c->argv[j],where); pushed++; } // 返回list長度 addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0)); if (pushed) { // 命令傳播 char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); } server.dirty += pushed; } // 1. 建立初始list // object.c, 建立初始list robj *createQuicklistObject(void) { quicklist *l = quicklistCreate(); robj *o = createObject(OBJ_LIST,l); o->encoding = OBJ_ENCODING_QUICKLIST; return o; } // quicklist.c, 建立一個新的list容器,初始化預設值 /* Create a new quicklist. * Free with quicklistRelease(). */ quicklist *quicklistCreate(void) { struct quicklist *quicklist; quicklist = zmalloc(sizeof(*quicklist)); quicklist->head = quicklist->tail = NULL; quicklist->len = 0; quicklist->count = 0; quicklist->compress = 0; quicklist->fill = -2; return quicklist; } // 2. 設定quicklist 的fill和depth 值 // quicklist.c void quicklistSetOptions(quicklist *quicklist, int fill, int depth) { quicklistSetFill(quicklist, fill); quicklistSetCompressDepth(quicklist, depth); } // quicklist.c, 設定 fill 引數 void quicklistSetFill(quicklist *quicklist, int fill) { if (fill > FILL_MAX) { fill = FILL_MAX; } else if (fill < -5) { fill = -5; } quicklist->fill = fill; } // quicklist.c, 設定 depth 引數 void quicklistSetCompressDepth(quicklist *quicklist, int compress) { if (compress > COMPRESS_MAX) { compress = COMPRESS_MAX; } else if (compress < 0) { compress = 0; } quicklist->compress = compress; } // 3. 將元素新增進list中 // t_list.c, /* The function pushes an element to the specified list object 'subject', * at head or tail position as specified by 'where'. * * There is no need for the caller to increment the refcount of 'value' as * the function takes care of it if needed. */ void listTypePush(robj *subject, robj *value, int where) { if (subject->encoding == OBJ_ENCODING_QUICKLIST) { int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL; // 解碼value value = getDecodedObject(value); size_t len = sdslen(value->ptr); // 將value新增到連結串列中 quicklistPush(subject->ptr, value->ptr, len, pos); // 減小value的引用,如果是被解編碼後的物件,此時會將記憶體釋放 decrRefCount(value); } else { serverPanic("Unknown list encoding"); } } // object.c /* Get a decoded version of an encoded object (returned as a new object). * If the object is already raw-encoded just increment the ref count. */ robj *getDecodedObject(robj *o) { robj *dec; // OBJ_ENCODING_RAW,OBJ_ENCODING_EMBSTR 編碼直接返回,引用計數+1(原因是: 原始robj一個引用,轉換後的robj一個引用) if (sdsEncodedObject(o)) { incrRefCount(o); return o; } if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_INT) { char buf[32]; // 整型轉換為字元型,返回string型的robj ll2string(buf,32,(long)o->ptr); dec = createStringObject(buf,strlen(buf)); return dec; } else { serverPanic("Unknown encoding type"); } } // quicklist.c, 新增value到連結串列中 /* Wrapper to allow argument-based switching between HEAD/TAIL pop */ void quicklistPush(quicklist *quicklist, void *value, const size_t sz, int where) { // 根據where決定新增到表頭還表尾 if (where == QUICKLIST_HEAD) { quicklistPushHead(quicklist, value, sz); } else if (where == QUICKLIST_TAIL) { quicklistPushTail(quicklist, value, sz); } } // quicklist.c, 新增表頭資料 /* Add new entry to head node of quicklist. * * Returns 0 if used existing head. * Returns 1 if new head created. */ int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) { quicklistNode *orig_head = quicklist->head; // likely 對不同平臺處理 __builtin_expect(!!(x), 1), // 判斷是否允許插入元素,實際上是判斷 head 的ziplist空間是否已佔滿, 沒有則直接往裡面插入即可 // fill 預設: -2 // depth 預設: 0 if (likely( _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) { // 3.1. 新增head節點的zl連結串列中, zl 為ziplist 連結串列節點 quicklist->head->zl = ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); // 3.2. 更新頭節點size大小 quicklistNodeUpdateSz(quicklist->head); } else { // 如果head已佔滿,則建立一個新的 quicklistNode 節點進行插入 quicklistNode *node = quicklistCreateNode(); node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(node); // 3.3. 插入新節點到head之前 _quicklistInsertNodeBefore(quicklist, quicklist->head, node); } // 將連結串列計數+1, 避免獲取總數時迭代計算 quicklist->count++; quicklist->head->count++; return (orig_head != quicklist->head); } // quicklist.c, 判斷是否允許插入元素 REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node, const int fill, const size_t sz) { if (unlikely(!node)) return 0; int ziplist_overhead; /* size of previous offset */ if (sz < 254) ziplist_overhead = 1; else ziplist_overhead = 5; /* size of forward offset */ if (sz < 64) ziplist_overhead += 1; else if (likely(sz < 16384)) ziplist_overhead += 2; else ziplist_overhead += 5; /* new_sz overestimates if 'sz' encodes to an integer type */ // 加上需要新增的新元素的長度後,進行閥值判定,如果在閥值內,則返回1,否則返回0 unsigned int new_sz = node->sz + sz + ziplist_overhead; // 使用fill引數判定 if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill))) return 1; else if (!sizeMeetsSafetyLimit(new_sz)) return 0; else if ((int)node->count < fill) return 1; else return 0; } // quicklist.c REDIS_STATIC int _quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz, const int fill) { if (fill >= 0) return 0; size_t offset = (-fill) - 1; // /* Optimization levels for size-based filling */ // static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536}; // offset < 5, offset 預設將等於 1, sz <= 8292 if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) { if (sz <= optimization_level[offset]) { return 1; } else { return 0; } } else { return 0; } } // SIZE_SAFETY_LIMIT 8192 #define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT) // 3.1. 向每個連結串列節點中新增value, 實際是向 ziplist push 資料 // ziplist.c, push *s 資料到 zl 中 unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) { unsigned char *p; p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl); // 具體新增元素方法,有點複雜。簡單點說就是 判斷容量、擴容、按照ziplist協議新增元素 return __ziplistInsert(zl,p,s,slen); } // ziplist.c, 在hash的資料介紹時已詳細介紹 /* Insert item at "p". */ static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) { size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen; unsigned int prevlensize, prevlen = 0; size_t offset; int nextdiff = 0; unsigned char encoding = 0; long long value = 123456789; /* initialized to avoid warning. Using a value that is easy to see if for some reason we use it uninitialized. */ zlentry tail; /* Find out prevlen for the entry that is inserted. */ if (p[0] != ZIP_END) { ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); } else { unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl); if (ptail[0] != ZIP_END) { prevlen = zipRawEntryLength(ptail); } } /* See if the entry can be encoded */ if (zipTryEncoding(s,slen,&value,&encoding)) { /* 'encoding' is set to the appropriate integer encoding */ reqlen = zipIntSize(encoding); } else { /* 'encoding' is untouched, however zipEncodeLength will use the * string length to figure out how to encode it. */ reqlen = slen; } /* We need space for both the length of the previous entry and * the length of the payload. */ reqlen += zipPrevEncodeLength(NULL,prevlen); reqlen += zipEncodeLength(NULL,encoding,slen); /* When the insert position is not equal to the tail, we need to * make sure that the next entry can hold this entry's length in * its prevlen field. */ nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0; /* Store offset because a realloc may change the address of zl. */ offset = p-zl; zl = ziplistResize(zl,curlen+reqlen+nextdiff); p = zl+offset; /* Apply memory move when necessary and update tail offset. */ if (p[0] != ZIP_END) { /* Subtract one because of the ZIP_END bytes */ memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff); /* Encode this entry's raw length in the next entry. */ zipPrevEncodeLength(p+reqlen,reqlen); /* Update offset for tail */ ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen); /* When the tail contains more than one entry, we need to take * "nextdiff" in account as well. Otherwise, a change in the * size of prevlen doesn't have an effect on the *tail* offset. */ zipEntry(p+reqlen, &tail); if (p[reqlen+tail.headersize+tail.len] != ZIP_END) { ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff); } } else { /* This element will be the new tail. */ ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl); } /* When nextdiff != 0, the raw length of the next entry has changed, so * we need to cascade the update throughout the ziplist */ if (nextdiff != 0) { offset = p-zl; zl = __ziplistCascadeUpdate(zl,p+reqlen); p = zl+offset; } /* Write the entry */ p += zipPrevEncodeLength(p,prevlen); p += zipEncodeLength(p,encoding,slen); if (ZIP_IS_STR(encoding)) { memcpy(p,s,slen); } else { zipSaveInteger(p,value,encoding); } ZIPLIST_INCR_LENGTH(zl,1); return zl; } // 3.2. 更新node的size (實際佔用記憶體空間大小) // quicklist.c, 更新node的size, 其實就是重新統計node的ziplist長度 #define quicklistNodeUpdateSz(node) \ do { \ (node)->sz = ziplistBlobLen((node)->zl); \ } while (0) // 3.3. 新增新連結串列節點到head之前 // quicklist.c, /* Wrappers for node inserting around existing node. */ REDIS_STATIC void _quicklistInsertNodeBefore(quicklist *quicklist, quicklistNode *old_node, quicklistNode *new_node) { __quicklistInsertNode(quicklist, old_node, new_node, 0); } /* Insert 'new_node' after 'old_node' if 'after' is 1. * Insert 'new_node' before 'old_node' if 'after' is 0. * Note: 'new_node' is *always* uncompressed, so if we assign it to * head or tail, we do not need to uncompress it. */ REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist, quicklistNode *old_node, quicklistNode *new_node, int after) { if (after) { new_node->prev = old_node; if (old_node) { new_node->next = old_node->next; if (old_node->next) old_node->next->prev = new_node; old_node->next = new_node; } if (quicklist->tail == old_node) quicklist->tail = new_node; } else { // 插入new_node到old_node之前 new_node->next = old_node; if (old_node) { new_node->prev = old_node->prev; if (old_node->prev) old_node->prev->next = new_node; old_node->prev = new_node; } // 替換頭節點位置 if (quicklist->head == old_node) quicklist->head = new_node; } /* If this insert creates the only element so far, initialize head/tail. */ // 第一個元素 if (quicklist->len == 0) { quicklist->head = quicklist->tail = new_node; } // 壓縮list if (old_node) quicklistCompress(quicklist, old_node); quicklist->len++; } // quicklist.c, 壓縮list #define quicklistCompress(_ql, _node) \ do { \ if ((_node)->recompress) \ // recompress quicklistCompressNode((_node)); \ else \ // __quicklistCompress((_ql), (_node)); \ } while (0) // recompress /* Compress only uncompressed nodes. */ #define quicklistCompressNode(_node) \ do { \ if ((_node) && (_node)->encoding == QUICKLIST_NODE_ENCODING_RAW) { \ __quicklistCompressNode((_node)); \ } \ } while (0) /* Compress the ziplist in 'node' and update encoding details. * Returns 1 if ziplist compressed successfully. * Returns 0 if compression failed or if ziplist too small to compress. */ REDIS_STATIC int __quicklistCompressNode(quicklistNode *node) { #ifdef REDIS_TEST node->attempted_compress = 1; #endif /* Don't bother compressing small values */ if (node->sz < MIN_COMPRESS_BYTES) return 0; quicklistLZF *lzf = zmalloc(sizeof(*lzf) + node->sz); /* Cancel if compression fails or doesn't compress small enough */ // lzf 壓縮演算法,有點複雜咯 if (((lzf->sz = lzf_compress(node->zl, node->sz, lzf->compressed, node->sz)) == 0) || lzf->sz + MIN_COMPRESS_IMPROVE >= node->sz) { /* lzf_compress aborts/rejects compression if value not compressable. */ zfree(lzf); return 0; } lzf = zrealloc(lzf, sizeof(*lzf) + lzf->sz); zfree(node->zl); node->zl = (unsigned char *)lzf; node->encoding = QUICKLIST_NODE_ENCODING_LZF; node->recompress = 0; return 1; } /* Force 'quicklist' to meet compression guidelines set by compress depth. * The only way to guarantee interior nodes get compressed is to iterate * to our "interior" compress depth then compress the next node we find. * If compress depth is larger than the entire list, we return immediately. */ REDIS_STATIC void __quicklistCompress(const quicklist *quicklist, quicklistNode *node) { /* If length is less than our compress depth (from both sides), * we can't compress anything. */ if (!quicklistAllowsCompression(quicklist) || quicklist->len < (unsigned int)(quicklist->compress * 2)) return; #if 0 /* Optimized cases for small depth counts */ if (quicklist->compress == 1) { quicklistNode *h = quicklist->head, *t = quicklist->tail; quicklistDecompressNode(h); quicklistDecompressNode(t); if (h != node && t != node) quicklistCompressNode(node); return; } else if (quicklist->compress == 2) { quicklistNode *h = quicklist->head, *hn = h->next, *hnn = hn->next; quicklistNode *t = quicklist->tail, *tp = t->prev, *tpp = tp->prev; quicklistDecompressNode(h); quicklistDecompressNode(hn); quicklistDecompressNode(t); quicklistDecompressNode(tp); if (h != node && hn != node && t != node && tp != node) { quicklistCompressNode(node); } if (hnn != t) { quicklistCompressNode(hnn); } if (tpp != h) { quicklistCompressNode(tpp); } return; } #endif /* Iterate until we reach compress depth for both sides of the list.a * Note: because we do length checks at the *top* of this function, * we can skip explicit null checks below. Everything exists. */ quicklistNode *forward = quicklist->head; quicklistNode *reverse = quicklist->tail; int depth = 0; int in_depth = 0; while (depth++ < quicklist->compress) { // 解壓縮??? quicklistDecompressNode(forward); quicklistDecompressNode(reverse); if (forward == node || reverse == node) in_depth = 1; if (forward == reverse) return; forward = forward->next; reverse = reverse->prev; } if (!in_depth) quicklistCompressNode(node); if (depth > 2) { /* At this point, forward and reverse are one node beyond depth */ // 壓縮 quicklistCompressNode(forward); quicklistCompressNode(reverse); } } /* Decompress only compressed nodes. */ #define quicklistDecompressNode(_node) \ do { \ if ((_node) && (_node)->encoding == QUICKLIST_NODE_ENCODING_LZF) { \ __quicklistDecompressNode((_node)); \ } \ } while (0) /* Uncompress the ziplist in 'node' and update encoding details. * Returns 1 on successful decode, 0 on failure to decode. */ REDIS_STATIC int __quicklistDecompressNode(quicklistNode *node) { #ifdef REDIS_TEST node->attempted_compress = 0; #endif void *decompressed = zmalloc(node->sz); quicklistLZF *lzf = (quicklistLZF *)node->zl; if (lzf_decompress(lzf->compressed, lzf->sz, decompressed, node->sz) == 0) { /* Someone requested decompress, but we can't decompress. Not good. */ zfree(decompressed); return 0; } zfree(lzf); node->zl = decompressed; node->encoding = QUICKLIST_NODE_ENCODING_RAW; return 1; }
總體來說,redis的list實現不是純粹的單雙向連結串列,而是 使用雙向連結串列+ziplist 的方式實現連結串列功能,既節省了記憶體空間,對於查詢來說時間複雜度也相對小。我們用一個時序圖來重新審視下:
三、lindex/lrange/rrange 查詢操作
讀資料是資料庫的一個另一個重要功能。一般來說,有單個查詢,批量查詢,範圍查詢之類的功能,咱們就分頭說說。
// 1. 單個查詢 lindex key index // t_list.c, 通過下標查詢元素值 void lindexCommand(client *c) { robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk); // 如果key本身就不存在,直接返回,空已響應 if (o == NULL || checkType(c,o,OBJ_LIST)) return; long index; robj *value = NULL; // 解析index欄位,賦給 index 變數 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != C_OK)) return; if (o->encoding == OBJ_ENCODING_QUICKLIST) { quicklistEntry entry; // 根據index查詢list資料 if (quicklistIndex(o->ptr, index, &entry)) { // 使用兩個欄位來儲存value if (entry.value) { value = createStringObject((char*)entry.value,entry.sz); } else { value = createStringObjectFromLongLong(entry.longval); } addReplyBulk(c,value); decrRefCount(value); } else { addReply(c,shared.nullbulk); } } else { serverPanic("Unknown list encoding"); } } // quicklist.c, 根據 index 查詢元素 /* Populate 'entry' with the element at the specified zero-based index * where 0 is the head, 1 is the element next to head * and so on. Negative integers are used in order to count * from the tail, -1 is the last element, -2 the penultimate * and so on. If the index is out of range 0 is returned. * * Returns 1 if element found * Returns 0 if element not found */ int quicklistIndex(const quicklist *quicklist, const long long idx, quicklistEntry *entry) { quicklistNode *n; unsigned long long accum = 0; unsigned long long index; int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */ // 初始化 quicklistEntry, 設定預設值 initEntry(entry); entry->quicklist = quicklist; // index為負數時,逆向搜尋 if (!forward) { index = (-idx) - 1; n = quicklist->tail; } else { index = idx; n = quicklist->head; } if (index >= quicklist->count) return 0; while (likely(n)) { // n->count 代表每個list節點裡的實際元素的個數(ziplist裡可能包含n個元素) // 此處代表只會迭代到 index 所在的list節點就停止了 if ((accum + n->count) > index) { break; } else { D("Skipping over (%p) %u at accum %lld", (void *)n, n->count, accum); // 依次迭代 accum += n->count; n = forward ? n->next : n->prev; } } // 如果已經迭代完成,說明未找到index元素 if (!n) return 0; D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n, accum, index, index - accum, (-index) - 1 + accum); entry->node = n; if (forward) { /* forward = normal head-to-tail offset. */ // index-accum 代表index節點在 當前n節點中的偏移 entry->offset = index - accum; } else { /* reverse = need negative offset for tail-to-head, so undo * the result of the original if (index < 0) above. */ // 逆向搜尋定位 如-1=1-1+0,-2=2-1+0 entry->offset = (-index) - 1 + accum; } // 解壓縮node資料 quicklistDecompressNodeForUse(entry->node); // 根據offset,查詢ziplist中的sds value entry->zi = ziplistIndex(entry->node->zl, entry->offset); // 從zi中獲取value,sz,longval 返回 (ziplist 協議) ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval); /* The caller will use our result, so we don't re-compress here. * The caller can recompress or delete the node as needed. */ return 1; } // quicklist.c /* Simple way to give quicklistEntry structs default values with one call. */ #define initEntry(e) \ do { \ (e)->zi = (e)->value = NULL; \ (e)->longval = -123456789; \ (e)->quicklist = NULL; \ (e)->node = NULL; \ (e)->offset = 123456789; \ (e)->sz = 0; \ } while (0) // 解壓縮node資料 /* Force node to not be immediately re-compresable */ #define quicklistDecompressNodeForUse(_node) \ do { \ if ((_node) && (_node)->encoding == QUICKLIST_NODE_ENCODING_LZF) { \ __quicklistDecompressNode((_node)); \ (_node)->recompress = 1; \ } \ } while (0) /* Uncompress the ziplist in 'node' and update encoding details. * Returns 1 on successful decode, 0 on failure to decode. */ REDIS_STATIC int __quicklistDecompressNode(quicklistNode *node) { #ifdef REDIS_TEST node->attempted_compress = 0; #endif void *decompressed = zmalloc(node->sz); quicklistLZF *lzf = (quicklistLZF *)node->zl; if (lzf_decompress(lzf->compressed, lzf->sz, decompressed, node->sz) == 0) { /* Someone requested decompress, but we can't decompress. Not good. */ zfree(decompressed); return 0; } zfree(lzf); node->zl = decompressed; node->encoding = QUICKLIST_NODE_ENCODING_RAW; return 1; } // ziplist.c /* Returns an offset to use for iterating with ziplistNext. When the given * index is negative, the list is traversed back to front. When the list * doesn't contain an element at the provided index, NULL is returned. */ unsigned char *ziplistIndex(unsigned char *zl, int index) { unsigned char *p; unsigned int prevlensize, prevlen = 0; if (index < 0) { index = (-index)-1; p = ZIPLIST_ENTRY_TAIL(zl); if (p[0] != ZIP_END) { ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); while (prevlen > 0 && index--) { p -= prevlen; ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); } } } else { p = ZIPLIST_ENTRY_HEAD(zl); while (p[0] != ZIP_END && index--) { p += zipRawEntryLength(p); } } return (p[0] == ZIP_END || index > 0) ? NULL : p; }
對於範圍查詢來說,按照redis之前的套路,有可能是在單個查詢的上面再進行迴圈查詢就可以了,是否是這樣呢?我們來看看:
// 2. lrange 範圍查詢 // t_list.c void lrangeCommand(client *c) { robj *o; long start, end, llen, rangelen; // 解析 start,end 引數 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) || (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK)) return; if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,OBJ_LIST)) return; // list 長度獲取, 有個計數器在 llen = listTypeLength(o); /* convert negative indexes */ if (start < 0) start = llen+start; if (end < 0) end = llen+end; // 將-xx的下標轉換為正數查詢,如果負數過大,則以0計算 if (start < 0) start = 0; /* Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { addReply(c,shared.emptymultibulk); return; } // end 過大,則限制 // end 不可能小於0,因為上一個 start > end 已限制 if (end >= llen) end = llen-1; rangelen = (end-start)+1; /* Return the result in form of a multi-bulk reply */ addReplyMultiBulkLen(c,rangelen); if (o->encoding == OBJ_ENCODING_QUICKLIST) { // 返回列表迭代器, start-TAIL, LIST_TAIL 代表正向迭代 listTypeIterator *iter = listTypeInitIterator(o, start, LIST_TAIL); // 迭代到 rangelen=0 為止,依次向輸出緩衝輸出 while(rangelen--) { listTypeEntry entry; // 獲取下一個元素 listTypeNext(iter, &entry); quicklistEntry *qe = &entry.entry; if (qe->value) { addReplyBulkCBuffer(c,qe->value,qe->sz); } else { addReplyBulkLongLong(c,qe->longval); } } listTypeReleaseIterator(iter); } else { serverPanic("List encoding is not QUICKLIST!"); } } // t_list.c, 統計list長度 unsigned long listTypeLength(robj *subject) { if (subject->encoding == OBJ_ENCODING_QUICKLIST) { return quicklistCount(subject->ptr); } else { serverPanic("Unknown list encoding"); } } /* Return cached quicklist count */ unsigned int quicklistCount(quicklist *ql) { return ql->count; } // 初始化 list 迭代器 /* Initialize an iterator at the specified index. */ listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) { listTypeIterator *li = zmalloc(sizeof(listTypeIterator)); li->subject = subject; li->encoding = subject->encoding; li->direction = direction; li->iter = NULL; /* LIST_HEAD means start at TAIL and move *towards* head. * LIST_TAIL means start at HEAD and move *towards tail. */ int iter_direction = direction == LIST_HEAD ? AL_START_TAIL : AL_START_HEAD; if (li->encoding == OBJ_ENCODING_QUICKLIST) { li->iter = quicklistGetIteratorAtIdx(li->subject->ptr, iter_direction, index); } else { serverPanic("Unknown list encoding"); } return li; } /* Initialize an iterator at a specific offset 'idx' and make the iterator * return nodes in 'direction' direction. */ quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist, const int direction, const long long idx) { quicklistEntry entry; // 查詢idx 元素先 (前面已介紹, 為 ziplist+quicklist 迭代獲得) if (quicklistIndex(quicklist, idx, &entry)) { // 獲取獲取的是整個list的迭代器, 通過current和offset進行迭代 quicklistIter *base = quicklistGetIterator(quicklist, direction); base->zi = NULL; base->current = entry.node; base->offset = entry.offset; return base; } else { return NULL; } } // quicklist, list迭代器初始化 /* Returns a quicklist iterator 'iter'. After the initialization every * call to quicklistNext() will return the next element of the quicklist. */ quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction) { quicklistIter *iter; // 迭代器只包含當前元素 iter = zmalloc(sizeof(*iter)); if (direction == AL_START_HEAD) { iter->current = quicklist->head; iter->offset = 0; } else if (direction == AL_START_TAIL) { iter->current = quicklist->tail; iter->offset = -1; } iter->direction = direction; iter->quicklist = quicklist; iter->zi = NULL; return iter; } // 迭代器攜帶整個list 引用,及當前節點,如何進行迭代,則是重點 // t_list.c, 迭代list元素, 並將 當前節點賦給 entry /* Stores pointer to current the entry in the provided entry structure * and advances the position of the iterator. Returns 1 when the current * entry is in fact an entry, 0 otherwise. */ int listTypeNext(listTypeIterator *li, listTypeEntry *entry) { /* Protect from converting when iterating */ serverAssert(li->subject->encoding == li->encoding); entry->li = li; if (li->encoding == OBJ_ENCODING_QUICKLIST) { // 迭代iter(改變iter指向), 賦值給 entry->entry return quicklistNext(li->iter, &entry->entry); } else { serverPanic("Unknown list encoding"); } return 0; } // quicklist.c /* Get next element in iterator. * * Note: You must NOT insert into the list while iterating over it. * You *may* delete from the list while iterating using the * quicklistDelEntry() function. * If you insert into the quicklist while iterating, you should * re-create the iterator after your addition. * * iter = quicklistGetIterator(quicklist,<direction>); * quicklistEntry entry; * while (quicklistNext(iter, &entry)) { * if (entry.value) * [[ use entry.value with entry.sz ]] * else * [[ use entry.longval ]] * } * * Populates 'entry' with values for this iteration. * Returns 0 when iteration is complete or if iteration not possible. * If return value is 0, the contents of 'entry' are not valid. */ int quicklistNext(quicklistIter *iter, quicklistEntry *entry) { initEntry(entry); if (!iter) { D("Returning because no iter!"); return 0; } // 儲存當前node, 及quicklist引用 entry->quicklist = iter->quicklist; entry->node = iter->current; if (!iter->current) { D("Returning because current node is NULL") return 0; } unsigned char *(*nextFn)(unsigned char *, unsigned char *) = NULL; int offset_update = 0; if (!iter->zi) { /* If !zi, use current index. */ // 初始化時 zi 未賦值,所以直接使用當前元素,使用offset進行查詢 quicklistDecompressNodeForUse(iter->current); iter->zi = ziplistIndex(iter->current->zl, iter->offset); } else { /* else, use existing iterator offset and get prev/next as necessary. */ if (iter->direction == AL_START_HEAD) { nextFn = ziplistNext; offset_update = 1; } else if (iter->direction == AL_START_TAIL) { nextFn = ziplistPrev; offset_update = -1; } // 向前或向後迭代元素 iter->zi = nextFn(iter->current->zl, iter->zi); iter->offset += offset_update; } entry->zi = iter->zi; entry->offset = iter->offset; if (iter->zi) { /* Populate value from existing ziplist position */ // 從 zi 中獲取值返回 (按ziplist協議) ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval); return 1; } else { /* We ran out of ziplist entries. * Pick next node, update offset, then re-run retrieval. */ // 當前ziplist沒有下一個元素了,遞迴查詢下一個ziplist quicklistCompress(iter->quicklist, iter->current); if (iter->direction == AL_START_HEAD) { /* Forward traversal */ D("Jumping to start of next node"); iter->current = iter->current->next; iter->offset = 0; } else if (iter->direction == AL_START_TAIL) { /* Reverse traversal */ D("Jumping to end of previous node"); iter->current = iter->current->prev; iter->offset = -1; } iter->zi = NULL; return quicklistNext(iter, entry); } } // ziplist.c /* Get entry pointed to by 'p' and store in either '*sstr' or 'sval' depending * on the encoding of the entry. '*sstr' is always set to NULL to be able * to find out whether the string pointer or the integer value was set. * Return 0 if 'p' points to the end of the ziplist, 1 otherwise. */ unsigned int ziplistGet(unsigned char *p, unsigned char **sstr, unsigned int *slen, long long *sval) { zlentry entry; if (p == NULL || p[0] == ZIP_END) return 0; if (sstr) *sstr = NULL; zipEntry(p, &entry); if (ZIP_IS_STR(entry.encoding)) { if (sstr) { *slen = entry.len; *sstr = p+entry.headersize; } } else { if (sval) { *sval = zipLoadInteger(p+entry.headersize,entry.encoding); } } return 1; }
看起來並沒有利用單個查詢的程式碼,而是使用迭代器進行操作。看起來不難,但是有點繞,我們就用一個時序圖來重新表達下:
四、lrem 刪除操作
增刪改查,還是要湊夠的。刪除的操作,自然是要配置資料結構來做了,比如: 如何定位要刪除的元素,刪除後連結串列是否需要重排?
// LREM key count value, 只提供了範圍刪除的方式,單個數據刪除可以通過此命令來完成 // t_list.c void lremCommand(client *c) { robj *subject, *obj; obj = c->argv[3]; long toremove; long removed = 0; if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != C_OK)) return; subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero); if (subject == NULL || checkType(c,subject,OBJ_LIST)) return; // 因是範圍型刪除,自然使用迭代刪除是最好的選擇了 listTypeIterator *li; if (toremove < 0) { toremove = -toremove; li = listTypeInitIterator(subject,-1,LIST_HEAD); } else { li = listTypeInitIterator(subject,0,LIST_TAIL); } listTypeEntry entry; // 迭代方式我們在查詢操作已詳細說明 while (listTypeNext(li,&entry)) { // 1. 比較元素是否是需要刪除的物件,只有完全匹配才可以刪除 if (listTypeEqual(&entry,obj)) { // 2. 實際的刪除動作 listTypeDelete(li, &entry); server.dirty++; removed++; if (toremove && removed == toremove) break; } } listTypeReleaseIterator(li); // 如果沒有任何元素後,將key從db中刪除 if (listTypeLength(subject) == 0) { dbDelete(c->db,c->argv[1]); } addReplyLongLong(c,removed); if (removed) signalModifiedKey(c->db,c->argv[1]); } // 1. 是否與指定robj相等 // t_list.c, listTypeEntry 是否與指定robj相等 /* Compare the given object with the entry at the current position. */ int listTypeEqual(listTypeEntry *entry, robj *o) { if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) { serverAssertWithInfo(NULL,o,sdsEncodedObject(o)); return quicklistCompare(entry->entry.zi,o->ptr,sdslen(o->ptr)); } else { serverPanic("Unknown list encoding"); } } // t_list.c int quicklistCompare(unsigned char *p1, unsigned char *p2, int p2_len) { // 元素本身是 ziplist 型別的,所以直接交由ziplist比對即可 return ziplistCompare(p1, p2, p2_len); } // ziplist.c /* Compare entry pointer to by 'p' with 'sstr' of length 'slen'. */ /* Return 1 if equal. */ unsigned int ziplistCompare(unsigned char *p, unsigned char *sstr, unsigned int slen) { zlentry entry; unsigned char sencoding; long long zval, sval; if (p[0] == ZIP_END) return 0; zipEntry(p, &entry); if (ZIP_IS_STR(entry.encoding)) { /* Raw compare */ if (entry.len == slen) { return memcmp(p+entry.headersize,sstr,slen) == 0; } else { return 0; } } else { /* Try to compare encoded values. Don't compare encoding because * different implementations may encoded integers differently. */ if (zipTryEncoding(sstr,slen,&sval,&sencoding)) { zval = zipLoadInteger(p+entry.headersize,entry.encoding); return zval == sval; } } return 0; } /* Delete the element pointed to. */ void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) { if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) { quicklistDelEntry(iter->iter, &entry->entry); } else { serverPanic("Unknown list encoding"); } } // 2. 執行刪除操作 // t_list.c /* Delete the element pointed to. */ void listTypeDelete(listTypeIterator *iter, listTypeEntry *entry) { if (entry->li->encoding == OBJ_ENCODING_QUICKLIST) { quicklistDelEntry(iter->iter, &entry->entry); } else { serverPanic("Unknown list encoding"); } } // quicklist.c /* Delete one element represented by 'entry' * * 'entry' stores enough metadata to delete the proper position in * the correct ziplist in the correct quicklist node. */ void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) { quicklistNode *prev = entry->node->prev; quicklistNode *next = entry->node->next; int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist, entry->node, &entry->zi); /* after delete, the zi is now invalid for any future usage. */ iter->zi = NULL; /* If current node is deleted, we must update iterator node and offset. */ if (deleted_node) { // 如果node被刪除,則移動quicklist指標 if (iter->direction == AL_START_HEAD) { iter->current = next; iter->offset = 0; } else if (iter->direction == AL_START_TAIL) { iter->current = prev; iter->offset = -1; } } /* else if (!deleted_node), no changes needed. * we already reset iter->zi above, and the existing iter->offset * doesn't move again because: * - [1, 2, 3] => delete offset 1 => [1, 3]: next element still offset 1 * - [1, 2, 3] => delete offset 0 => [2, 3]: next element still offset 0 * if we deleted the last element at offet N and now * length of this ziplist is N-1, the next call into * quicklistNext() will jump to the next node. */ } // quicklist.c /* Delete one entry from list given the node for the entry and a pointer * to the entry in the node. * * Note: quicklistDelIndex() *requires* uncompressed nodes because you * already had to get *p from an uncompressed node somewhere. * * Returns 1 if the entire node was deleted, 0 if node still exists. * Also updates in/out param 'p' with the next offset in the ziplist. */ REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node, unsigned char **p) { int gone = 0; // 同樣,到最後一級,依舊是呼叫ziplist的方法進行刪除 (按照 ziplist 協議操作即可) node->zl = ziplistDelete(node->zl, p); node->count--; // 如果node中沒有元素了,就把當前node移除,否則更新 sz 大小 if (node->count == 0) { gone = 1; __quicklistDelNode(quicklist, node); } else { quicklistNodeUpdateSz(node); } quicklist->count--; /* If we deleted the node, the original node is no longer valid */ return gone ? 1 : 0; }
delete 操作總體來說就是一個迭代,比對,刪除的操作,細節還是有點多的,只是都是些我們前面說過的技術,也就無所謂了。
五、lpop 彈出佇列
這個功能大概和刪除的意思差不多,就是刪除最後一元素即可。事實上,我們也更喜歡使用redis這種功能。簡單看看。
// 用法: LPOP key // t_list.c void lpopCommand(client *c) { popGenericCommand(c,LIST_HEAD); } void popGenericCommand(client *c, int where) { robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk); if (o == NULL || checkType(c,o,OBJ_LIST)) return; // 彈出元素,重點看一下這個方法 robj *value = listTypePop(o,where); if (value == NULL) { addReply(c,shared.nullbulk); } else { char *event = (where == LIST_HEAD) ? "lpop" : "rpop"; addReplyBulk(c,value); decrRefCount(value); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); if (listTypeLength(o) == 0) { notifyKeyspaceEvent(NOTIFY_GENERIC,"del", c->argv[1],c->db->id); dbDelete(c->db,c->argv[1]); } signalModifiedKey(c->db,c->argv[1]); server.dirty++; } } // t_list.c robj *listTypePop(robj *subject, int where) { long long vlong; robj *value = NULL; int ql_where = where == LIST_HEAD ? QUICKLIST_HEAD : QUICKLIST_TAIL; if (subject->encoding == OBJ_ENCODING_QUICKLIST) { if (quicklistPopCustom(subject->ptr, ql_where, (unsigned char **)&value, NULL, &vlong, listPopSaver)) { if (!value) value = createStringObjectFromLongLong(vlong); } } else { serverPanic("Unknown list encoding"); } return value; } // quicklist.c /* pop from quicklist and return result in 'data' ptr. Value of 'data' * is the return value of 'saver' function pointer if the data is NOT a number. * * If the quicklist element is a long long, then the return value is returned in * 'sval'. * * Return value of 0 means no elements available. * Return value of 1 means check 'data' and 'sval' for values. * If 'data' is set, use 'data' and 'sz'. Otherwise, use 'sval'. */ int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data, unsigned int *sz, long long *sval, void *(*saver)(unsigned char *data, unsigned int sz)) { unsigned char *p; unsigned char *vstr; unsigned int vlen; long long vlong; int pos = (where == QUICKLIST_HEAD) ? 0 : -1; if (quicklist->count == 0) return 0; if (data) *data = NULL; if (sz) *sz = 0; if (sval) *sval = -123456789; quicklistNode *node; // 獲取ziplist中的,第一個元素或者最後一個節點 if (where == QUICKLIST_HEAD && quicklist->head) { node = quicklist->head; } else if (where == QUICKLIST_TAIL && quicklist->tail) { node = quicklist->tail; } else { return 0; } // 獲取ziplist中的,第一個元素或者最後一個元素 p = ziplistIndex(node->zl, pos); if (ziplistGet(p, &vstr, &vlen, &vlong)) { if (vstr) { if (data) // 建立string 物件返回 *data = saver(vstr, vlen); if (sz) *sz = vlen; } else { if (data) *data = NULL; if (sval) *sval = vlong; } // 刪除獲取資料後的元素 quicklistDelIndex(quicklist, node, &p); return 1; } return 0; }
彈出一個元素,大概分三步:
1. 獲取頭節點或尾節點;
2. 從ziplist中獲取第一個元素或最後一個元素;
3. 刪除頭節點或尾節點;
六、blpop 阻塞式彈出元素
算是阻塞佇列吧。我們只想看一下,像本地語言實現的阻塞,我們知道用鎖+wait/notify機制。redis是如何進行阻塞的呢?
// 用法: BLPOP key1 [key2] timeout // t_list.c 同樣 l/r 複用程式碼 void blpopCommand(client *c) { blockingPopGenericCommand(c,LIST_HEAD); } /* Blocking RPOP/LPOP */ void blockingPopGenericCommand(client *c, int where) { robj *o; mstime_t timeout; int j; if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS) != C_OK) return; // 迴圈查詢多個key for (j = 1; j < c->argc-1; j++) { o = lookupKeyWrite(c->db,c->argv[j]); if (o != NULL) { if (o->type != OBJ_LIST) { addReply(c,shared.wrongtypeerr); return; } else { // 如果有值,則和非阻塞版本一樣了,直接響應即可 if (listTypeLength(o) != 0) { /* Non empty list, this is like a non normal [LR]POP. */ char *event = (where == LIST_HEAD) ? "lpop" : "rpop"; robj *value = listTypePop(o,where); serverAssert(value != NULL); addReplyMultiBulkLen(c,2); addReplyBulk(c,c->argv[j]); addReplyBulk(c,value); decrRefCount(value); notifyKeyspaceEvent(NOTIFY_LIST,event, c->argv[j],c->db->id); if (listTypeLength(o) == 0) { dbDelete(c->db,c->argv[j]); notifyKeyspaceEvent(NOTIFY_GENERIC,"del", c->argv[j],c->db->id); } signalModifiedKey(c->db,c->argv[j]); server.dirty++; /* Replicate it as an [LR]POP instead of B[LR]POP. */ rewriteClientCommandVector(c,2, (where == LIST_HEAD) ? shared.lpop : shared.rpop, c->argv[j]); // 獲取到值後直接結束流程 return; } } } } /* If we are inside a MULTI/EXEC and the list is empty the only thing * we can do is treating it as a timeout (even with timeout 0). */ if (c->flags & CLIENT_MULTI) { addReply(c,shared.nullmultibulk); return; } /* If the list is empty or the key does not exists we must block */ // 阻塞是在這裡實現的 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL); } /* This is how the current blocking POP works, we use BLPOP as example: * - If the user calls BLPOP and the key exists and contains a non empty list * then LPOP is called instead. So BLPOP is semantically the same as LPOP * if blocking is not required. * - If instead BLPOP is called and the key does not exists or the list is * empty we need to block. In order to do so we remove the notification for * new data to read in the client socket (so that we'll not serve new * requests if the blocking request is not served). Also we put the client * in a dictionary (db->blocking_keys) mapping keys to a list of clients * blocking for this keys. * - If a PUSH operation against a key with blocked clients waiting is * performed, we mark this key as "ready", and after the current command, * MULTI/EXEC block, or script, is executed, we serve all the clients waiting * for this list, from the one that blocked first, to the last, accordingly * to the number of elements we have in the ready list. */ /* Set a client in blocking mode for the specified key, with the specified * timeout */ void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) { dictEntry *de; list *l; int j; c->bpop.timeout = timeout; c->bpop.target = target; if (target != NULL) incrRefCount(target); // 阻塞入隊判定 for (j = 0; j < numkeys; j++) { /* If the key already exists in the dict ignore it. */ if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue; incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ de = dictFind(c->db->blocking_keys,keys[j]); if (de == NULL) { int retval; /* For every key we take a list of clients blocked for it */ l = listCreate(); // 將阻塞key放到 db 中,後臺有執行緒去輪詢 retval = dictAdd(c->db->blocking_keys,keys[j],l); incrRefCount(keys[j]); serverAssertWithInfo(c,keys[j],retval == DICT_OK); } else { l = dictGetVal(de); } // 將每個key 依次新增到 c->db->blocking_keys, 後續迭代將會重新檢查取出 listAddNodeTail(l,c); } // 阻塞客戶端,其實就是設定阻塞標識,然後等待key變更或超時,下一次掃描時將重新檢測取出執行 blockClient(c,BLOCKED_LIST); } // block.c 設定阻塞標識 /* Block a client for the specific operation type. Once the CLIENT_BLOCKED * flag is set client query buffer is not longer processed, but accumulated, * and will be processed when the client is unblocked. */ void blockClient(client *c, int btype) { c->flags |= CLIENT_BLOCKED; c->btype = btype; server.bpop_blocked_clients++; }
redis阻塞功能的實現: 使用一個 db->blocking_keys 的列表來儲存需要阻塞的請求,在下一次迴圈時,進行掃描這些佇列的條件是否滿足,從而決定是否繼續阻塞或者取出。
思考: 從上面實現中,有個疑問:如何保證最多等待 timeout 時間或者最多迴圈多少次呢?你覺得應該如何處理呢?
OK, 至此,整個list資料結構的解析算是完整了。
&n