Redis之資料庫實現原始碼閱讀
lookupKey:查詢指定的鍵,如果存在返回對應的值
robj *lookupKey(redisDb *db, robj *key) { // 查詢鍵空間 dictEntry *de = dictFind(db->dict,key->ptr); // 節點存在 if (de) { // 取出值 robj *val = dictGetVal(de); /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ // 更新時間資訊(只在不存在子程序時執行,防止破壞 copy-on-write 機制) if (server.rdb_child_pid == -1 && server.aof_child_pid == -1) val->lru = LRU_CLOCK(); // 返回值 return val; } else { // 節點不存在 return NULL; } }
lookupKeyRead:首先會檢查key是否過期,然後從資料庫中取出該值,並且更新命中/不明中資訊
robj *lookupKeyRead(redisDb *db, robj *key) { robj *val; // 檢查 key 釋放已經過期 expireIfNeeded(db,key); // 從資料庫中取出鍵的值 val = lookupKey(db,key); // 更新命中/不命中資訊 if (val == NULL) server.stat_keyspace_misses++; else server.stat_keyspace_hits++; // 返回值 return val; }
lookupKeyWrite
robj *lookupKeyWrite(redisDb *db, robj *key) {
// 刪除過期鍵
expireIfNeeded(db,key);
// 查詢並返回 key 的值物件
return lookupKey(db,key);
}
lookupKeyReadOrReply:為執行讀取操作而從資料庫中查詢返回 key 的值。
robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) { // 查詢 robj *o = lookupKeyRead(c->db, key); // 決定是否傳送資訊 if (!o) addReply(c,reply); return o; }
lookupKeyWriteOrReply:為執行寫入操作而從資料庫中查詢返回 key 的值。
robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
robj *o = lookupKeyWrite(c->db, key);
if (!o) addReply(c,reply);
return o;
}
dbAdd:嘗試將鍵值對 key 和 val 新增到資料庫中。
void dbAdd(redisDb *db, robj *key, robj *val) {
// 複製鍵名
sds copy = sdsdup(key->ptr);
// 嘗試新增鍵值對
int retval = dictAdd(db->dict, copy, val);
// 如果鍵已經存在,那麼停止
redisAssertWithInfo(NULL,key,retval == REDIS_OK);
// 如果開啟了叢集模式,那麼將鍵儲存到槽裡面
if (server.cluster_enabled) slotToKeyAdd(key);
}
setKey:高層次的 SET 操作函式。
void setKey(redisDb *db, robj *key, robj *val) {
// 新增或覆寫資料庫中的鍵值對
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
}
incrRefCount(val);
// 移除鍵的過期時間
removeExpire(db,key);
// 傳送鍵修改通知
signalModifiedKey(db,key);
}
dbExists:檢查鍵 key 是否存在於資料庫中,存在返回 1 ,不存在返回 0 。
int dbExists(redisDb *db, robj *key) {
return dictFind(db->dict,key->ptr) != NULL;
}
dbRandomKey:隨機從資料庫中取出一個鍵,並以字串物件的方式返回這個鍵。
robj *dbRandomKey(redisDb *db) {
dictEntry *de;
while(1) {
sds key;
robj *keyobj;
// 從鍵空間中隨機取出一個鍵節點
de = dictGetRandomKey(db->dict);
// 資料庫為空
if (de == NULL) return NULL;
// 取出鍵
key = dictGetKey(de);
// 為鍵建立一個字串物件,物件的值為鍵的名字
keyobj = createStringObject(key,sdslen(key));
// 檢查鍵是否帶有過期時間
if (dictFind(db->expires,key)) {
// 如果鍵已經過期,那麼將它刪除,並繼續隨機下個鍵
if (expireIfNeeded(db,keyobj)) {
decrRefCount(keyobj);
continue; /* search for another key. This expired. */
}
}
// 返回被隨機到的鍵(的名字)
return keyobj;
}
}
dbDelete:從資料庫中刪除給定的鍵,鍵的值,以及鍵的過期時間。
int dbDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
// 刪除鍵的過期時間
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
// 刪除鍵值對
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
// 如果開啟了叢集模式,那麼從槽中刪除給定的鍵
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
// 鍵不存在
return 0;
}
}
emptyDb:清空伺服器的所有資料。
long long emptyDb(void(callback)(void*)) {
int j;
long long removed = 0;
// 清空所有資料庫
for (j = 0; j < server.dbnum; j++) {
// 記錄被刪除鍵的數量
removed += dictSize(server.db[j].dict);
// 刪除所有鍵值對
dictEmpty(server.db[j].dict,callback);
// 刪除所有鍵的過期時間
dictEmpty(server.db[j].expires,callback);
}
// 如果開啟了叢集模式,那麼還要移除槽記錄
if (server.cluster_enabled) slotToKeyFlush();
// 返回鍵的數量
return removed;
}
selectDb:將客戶端的目標資料庫切換為 id 所指定的資料庫
int selectDb(redisClient *c, int id) {
// 確保 id 在正確範圍內
if (id < 0 || id >= server.dbnum)
return REDIS_ERR;
// 切換資料庫(更新指標)
c->db = &server.db[id];
return REDIS_OK;
}
signalModifiedKey:每當資料庫中的鍵被改動時, signalModifiedKey() 函式都會被呼叫。
void signalModifiedKey(redisDb *db, robj *key) {
touchWatchedKey(db,key);
}
signalFlushedDb:每當一個數據庫被清空時, signalFlushDb() 都會被呼叫。
void signalFlushedDb(int dbid) {
touchWatchedKeysOnFlush(dbid);
}
flushdbCommand:清空客戶端指定的資料庫
void flushdbCommand(redisClient *c) {
server.dirty += dictSize(c->db->dict);
// 傳送通知
signalFlushedDb(c->db->id);
// 清空指定資料庫中的 dict 和 expires 字典
dictEmpty(c->db->dict,NULL);
dictEmpty(c->db->expires,NULL);
// 如果開啟了叢集模式,那麼還要移除槽記錄
if (server.cluster_enabled) slotToKeyFlush();
addReply(c,shared.ok);
}
flushallCommand:清空伺服器中的所有資料庫
void flushallCommand(redisClient *c) {
// 傳送通知
signalFlushedDb(-1);
// 清空所有資料庫
server.dirty += emptyDb(NULL);
addReply(c,shared.ok);
// 如果正在儲存新的 RDB ,那麼取消儲存操作
if (server.rdb_child_pid != -1) {
kill(server.rdb_child_pid,SIGUSR1);
rdbRemoveTempFile(server.rdb_child_pid);
}
// 更新 RDB 檔案
if (server.saveparamslen > 0) {
/* Normally rdbSave() will reset dirty, but we don't want this here
* as otherwise FLUSHALL will not be replicated nor put into the AOF. */
// rdbSave() 會清空伺服器的 dirty 屬性
// 但為了確保 FLUSHALL 命令會被正常傳播,
// 程式需要儲存並在 rdbSave() 呼叫之後還原伺服器的 dirty 屬性
int saved_dirty = server.dirty;
rdbSave(server.rdb_filename);
server.dirty = saved_dirty;
}
server.dirty++;
}
delCommand:刪除客戶端指定的鍵
void delCommand(redisClient *c) {
int deleted = 0, j;
// 遍歷所有輸入鍵
for (j = 1; j < c->argc; j++) {
// 先刪除過期的鍵
expireIfNeeded(c->db,c->argv[j]);
// 嘗試刪除鍵
if (dbDelete(c->db,c->argv[j])) {
// 刪除鍵成功,傳送通知
signalModifiedKey(c->db,c->argv[j]);
notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;
// 成功刪除才增加 deleted 計數器的值
deleted++;
}
}
// 返回被刪除鍵的數量
addReplyLongLong(c,deleted);
}
existsCommand:檢查一個鍵是否存在
void existsCommand(redisClient *c) {
// 檢查鍵是否已經過期,如果已過期的話,那麼將它刪除
// 這可以避免已過期的鍵被誤認為存在
expireIfNeeded(c->db,c->argv[1]);
// 在資料庫中查詢
if (dbExists(c->db,c->argv[1])) {
addReply(c, shared.cone);
} else {
addReply(c, shared.czero);
}
}
selectCommand:修改客戶端的當前資料庫
void selectCommand(redisClient *c) {
long id;
// 不合法的資料庫號碼
if (getLongFromObjectOrReply(c, c->argv[1], &id,
"invalid DB index") != REDIS_OK)
return;
if (server.cluster_enabled && id != 0) {
addReplyError(c,"SELECT is not allowed in cluster mode");
return;
}
// 切換資料庫
if (selectDb(c,id) == REDIS_ERR) {
addReplyError(c,"invalid DB index");
} else {
addReply(c,shared.ok);
}
}
randomkeyCommand:隨即返回一個鍵
void randomkeyCommand(redisClient *c) {
robj *key;
// 隨機返回鍵
if ((key = dbRandomKey(c->db)) == NULL) {
addReply(c,shared.nullbulk);
return;
}
addReplyBulk(c,key);
decrRefCount(key);
}
keysCommand:返回和客戶端指定的模式一致的所有鍵
void keysCommand(redisClient *c) {
dictIterator *di;
dictEntry *de;
// 模式
sds pattern = c->argv[1]->ptr;
int plen = sdslen(pattern), allkeys;
unsigned long numkeys = 0;
void *replylen = addDeferredMultiBulkLength(c);
// 遍歷整個資料庫,返回(名字)和模式匹配的鍵
di = dictGetSafeIterator(c->db->dict);
allkeys = (pattern[0] == '*' && pattern[1] == '\0');
while((de = dictNext(di)) != NULL) {
sds key = dictGetKey(de);
robj *keyobj;
// 將鍵名和模式進行比對
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
// 建立一個儲存鍵名字的字串物件
keyobj = createStringObject(key,sdslen(key));
// 刪除已過期鍵
if (expireIfNeeded(c->db,keyobj) == 0) {
addReplyBulk(c,keyobj);
numkeys++;
}
decrRefCount(keyobj);
}
}
dictReleaseIterator(di);
setDeferredMultiBulkLength(c,replylen,numkeys);
}
removeExpire:移除鍵 key 的過期時間
int removeExpire(redisDb *db, robj *key) {
/* An expire may only be removed if there is a corresponding entry in the
* main dict. Otherwise, the key will never be freed. */
// 確保鍵帶有過期時間
redisAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
// 刪除過期時間
return dictDelete(db->expires,key->ptr) == DICT_OK;
}
setExpire:將鍵 key 的過期時間設為 when
void setExpire(redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
/* Reuse the sds from the main dict in the expire dict */
// 取出鍵
kde = dictFind(db->dict,key->ptr);
redisAssertWithInfo(NULL,key,kde != NULL);
// 根據鍵取出鍵的過期時間
de = dictReplaceRaw(db->expires,dictGetKey(kde));
// 設定鍵的過期時間
// 這裡是直接使用整數值來儲存過期時間,不是用 INT 編碼的 String 物件
dictSetSignedIntegerVal(de,when);
}
getExpire:返回給定 key 的過期時間。
long long getExpire(redisDb *db, robj *key) {
dictEntry *de;
/* No expire? return ASAP */
// 獲取鍵的過期時間
// 如果過期時間不存在,那麼直接返回
if (dictSize(db->expires) == 0 ||
(de = dictFind(db->expires,key->ptr)) == NULL) return -1;
/* The entry was found in the expire dict, this means it should also
* be present in the main dict (safety check). */
redisAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
// 返回過期時間
return dictGetSignedIntegerVal(de);
}
propagateExpire:將過期時間傳播到附屬節點和 AOF 檔案。
void propagateExpire(redisDb *db, robj *key) {
robj *argv[2];
// 構造一個 DEL key 命令
argv[0] = shared.del;
argv[1] = key;
incrRefCount(argv[0]);
incrRefCount(argv[1]);
// 傳播到 AOF
if (server.aof_state != REDIS_AOF_OFF)
feedAppendOnlyFile(server.delCommand,db->id,argv,2);
// 傳播到所有附屬節點
replicationFeedSlaves(server.slaves,db->id,argv,2);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
}
expireIfNeeded:檢查 key 是否已經過期,如果是的話,將它從資料庫中刪除。
int expireIfNeeded(redisDb *db, robj *key) {
// 取出鍵的過期時間
mstime_t when = getExpire(db,key);
mstime_t now;
// 沒有過期時間
if (when < 0) return 0; /* No expire for this key */
/* Don't expire anything while loading. It will be done later. */
// 如果伺服器正在進行載入,那麼不進行任何過期檢查
if (server.loading) return 0;
/* If we are in the context of a Lua script, we claim that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
now = server.lua_caller ? server.lua_time_start : mstime();
/* If we are running in the context of a slave, return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
// 當伺服器執行在 replication 模式時
// 附屬節點並不主動刪除 key
// 它只返回一個邏輯上正確的返回值
// 真正的刪除操作要等待主節點發來刪除命令時才執行
// 從而保證資料的同步
if (server.masterhost != NULL) return now > when;
// 執行到這裡,表示鍵帶有過期時間,並且伺服器為主節點
/* Return when this key has not expired */
// 如果未過期,返回 0
if (now <= when) return 0;
/* Delete the key */
server.stat_expiredkeys++;
// 向 AOF 檔案和附屬節點傳播過期資訊
propagateExpire(db,key);
// 傳送事件通知
notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
"expired",key,db->id);
// 將過期鍵從資料庫中刪除
return dbDelete(db,key);
}