1. 程式人生 > >Redis之資料庫實現原始碼閱讀

Redis之資料庫實現原始碼閱讀

lookupKey:查詢指定的鍵,如果存在返回對應的值

robj *lookupKey(redisDb *db, robj *key) {

    // 查詢鍵空間
    dictEntry *de = dictFind(db->dict,key->ptr);

    // 節點存在
    if (de) {
        

        // 取出值
        robj *val = dictGetVal(de);

        /* Update the access time for the ageing algorithm.
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        // 更新時間資訊(只在不存在子程序時執行,防止破壞 copy-on-write 機制)
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
            val->lru = LRU_CLOCK();

        // 返回值
        return val;
    } else {

        // 節點不存在

        return NULL;
    }
}

lookupKeyRead:首先會檢查key是否過期,然後從資料庫中取出該值,並且更新命中/不明中資訊

robj *lookupKeyRead(redisDb *db, robj *key) {
    robj *val;

    // 檢查 key 釋放已經過期
    expireIfNeeded(db,key);

    // 從資料庫中取出鍵的值
    val = lookupKey(db,key);

    // 更新命中/不命中資訊
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;

    // 返回值
    return val;
}

lookupKeyWrite

robj *lookupKeyWrite(redisDb *db, robj *key) {

    // 刪除過期鍵
    expireIfNeeded(db,key);

    // 查詢並返回 key 的值物件
    return lookupKey(db,key);
}

lookupKeyReadOrReply:為執行讀取操作而從資料庫中查詢返回 key 的值。

robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {

    // 查詢
    robj *o = lookupKeyRead(c->db, key);

    // 決定是否傳送資訊
    if (!o) addReply(c,reply);

    return o;
}

lookupKeyWriteOrReply:為執行寫入操作而從資料庫中查詢返回 key 的值。

robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {

    robj *o = lookupKeyWrite(c->db, key);

    if (!o) addReply(c,reply);

    return o;
}

dbAdd:嘗試將鍵值對 key 和 val 新增到資料庫中。

void dbAdd(redisDb *db, robj *key, robj *val) {

    // 複製鍵名
    sds copy = sdsdup(key->ptr);

    // 嘗試新增鍵值對
    int retval = dictAdd(db->dict, copy, val);

    // 如果鍵已經存在,那麼停止
    redisAssertWithInfo(NULL,key,retval == REDIS_OK);

    // 如果開啟了叢集模式,那麼將鍵儲存到槽裡面
    if (server.cluster_enabled) slotToKeyAdd(key);
 }

setKey:高層次的 SET 操作函式。

void setKey(redisDb *db, robj *key, robj *val) {

    // 新增或覆寫資料庫中的鍵值對
    if (lookupKeyWrite(db,key) == NULL) {
        dbAdd(db,key,val);
    } else {
        dbOverwrite(db,key,val);
    }

    incrRefCount(val);

    // 移除鍵的過期時間
    removeExpire(db,key);

    // 傳送鍵修改通知
    signalModifiedKey(db,key);
}

dbExists:檢查鍵 key 是否存在於資料庫中,存在返回 1 ,不存在返回 0 。

int dbExists(redisDb *db, robj *key) {
    return dictFind(db->dict,key->ptr) != NULL;
}

dbRandomKey:隨機從資料庫中取出一個鍵,並以字串物件的方式返回這個鍵。

robj *dbRandomKey(redisDb *db) {
    dictEntry *de;

    while(1) {
        sds key;
        robj *keyobj;

        // 從鍵空間中隨機取出一個鍵節點
        de = dictGetRandomKey(db->dict);

        // 資料庫為空
        if (de == NULL) return NULL;

        // 取出鍵
        key = dictGetKey(de);
        // 為鍵建立一個字串物件,物件的值為鍵的名字
        keyobj = createStringObject(key,sdslen(key));
        // 檢查鍵是否帶有過期時間
        if (dictFind(db->expires,key)) {
            // 如果鍵已經過期,那麼將它刪除,並繼續隨機下個鍵
            if (expireIfNeeded(db,keyobj)) {
                decrRefCount(keyobj);
                continue; /* search for another key. This expired. */
            }
        }

        // 返回被隨機到的鍵(的名字)
        return keyobj;
    }
}

dbDelete:從資料庫中刪除給定的鍵,鍵的值,以及鍵的過期時間。

int dbDelete(redisDb *db, robj *key) {

    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    // 刪除鍵的過期時間
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

    // 刪除鍵值對
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        // 如果開啟了叢集模式,那麼從槽中刪除給定的鍵
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        // 鍵不存在
        return 0;
    }
}

emptyDb:清空伺服器的所有資料。

long long emptyDb(void(callback)(void*)) {
    int j;
    long long removed = 0;

    // 清空所有資料庫
    for (j = 0; j < server.dbnum; j++) {

        // 記錄被刪除鍵的數量
        removed += dictSize(server.db[j].dict);

        // 刪除所有鍵值對
        dictEmpty(server.db[j].dict,callback);
        // 刪除所有鍵的過期時間
        dictEmpty(server.db[j].expires,callback);
    }

    // 如果開啟了叢集模式,那麼還要移除槽記錄
    if (server.cluster_enabled) slotToKeyFlush();

    // 返回鍵的數量
    return removed;
}

selectDb:將客戶端的目標資料庫切換為 id 所指定的資料庫

int selectDb(redisClient *c, int id) {

    // 確保 id 在正確範圍內
    if (id < 0 || id >= server.dbnum)
        return REDIS_ERR;

    // 切換資料庫(更新指標)
    c->db = &server.db[id];

    return REDIS_OK;
}

signalModifiedKey:每當資料庫中的鍵被改動時, signalModifiedKey() 函式都會被呼叫。

void signalModifiedKey(redisDb *db, robj *key) {
    touchWatchedKey(db,key);
}

signalFlushedDb:每當一個數據庫被清空時, signalFlushDb() 都會被呼叫。

void signalFlushedDb(int dbid) {
    touchWatchedKeysOnFlush(dbid);
}

flushdbCommand:清空客戶端指定的資料庫

void flushdbCommand(redisClient *c) {

    server.dirty += dictSize(c->db->dict);

    // 傳送通知
    signalFlushedDb(c->db->id);

    // 清空指定資料庫中的 dict 和 expires 字典
    dictEmpty(c->db->dict,NULL);
    dictEmpty(c->db->expires,NULL);

    // 如果開啟了叢集模式,那麼還要移除槽記錄
    if (server.cluster_enabled) slotToKeyFlush();

    addReply(c,shared.ok);
}

flushallCommand:清空伺服器中的所有資料庫

void flushallCommand(redisClient *c) {

    // 傳送通知
    signalFlushedDb(-1);

    // 清空所有資料庫
    server.dirty += emptyDb(NULL);
    addReply(c,shared.ok);

    // 如果正在儲存新的 RDB ,那麼取消儲存操作
    if (server.rdb_child_pid != -1) {
        kill(server.rdb_child_pid,SIGUSR1);
        rdbRemoveTempFile(server.rdb_child_pid);
    }

    // 更新 RDB 檔案
    if (server.saveparamslen > 0) {
        /* Normally rdbSave() will reset dirty, but we don't want this here
         * as otherwise FLUSHALL will not be replicated nor put into the AOF. */
        // rdbSave() 會清空伺服器的 dirty 屬性
        // 但為了確保 FLUSHALL 命令會被正常傳播,
        // 程式需要儲存並在 rdbSave() 呼叫之後還原伺服器的 dirty 屬性
        int saved_dirty = server.dirty;

        rdbSave(server.rdb_filename);

        server.dirty = saved_dirty;
    }

    server.dirty++;
}

delCommand:刪除客戶端指定的鍵

void delCommand(redisClient *c) {
    int deleted = 0, j;

    // 遍歷所有輸入鍵
    for (j = 1; j < c->argc; j++) {

        // 先刪除過期的鍵
        expireIfNeeded(c->db,c->argv[j]);

        // 嘗試刪除鍵
        if (dbDelete(c->db,c->argv[j])) {

            // 刪除鍵成功,傳送通知

            signalModifiedKey(c->db,c->argv[j]);
            notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
                "del",c->argv[j],c->db->id);

            server.dirty++;

            // 成功刪除才增加 deleted 計數器的值
            deleted++;
        }
    }

    // 返回被刪除鍵的數量
    addReplyLongLong(c,deleted);
}

existsCommand:檢查一個鍵是否存在

void existsCommand(redisClient *c) {

    // 檢查鍵是否已經過期,如果已過期的話,那麼將它刪除
    // 這可以避免已過期的鍵被誤認為存在
    expireIfNeeded(c->db,c->argv[1]);

    // 在資料庫中查詢
    if (dbExists(c->db,c->argv[1])) {
        addReply(c, shared.cone);
    } else {
        addReply(c, shared.czero);
    }
}

selectCommand:修改客戶端的當前資料庫

void selectCommand(redisClient *c) {
    long id;

    // 不合法的資料庫號碼
    if (getLongFromObjectOrReply(c, c->argv[1], &id,
        "invalid DB index") != REDIS_OK)
        return;

    if (server.cluster_enabled && id != 0) {
        addReplyError(c,"SELECT is not allowed in cluster mode");
        return;
    }

    // 切換資料庫
    if (selectDb(c,id) == REDIS_ERR) {
        addReplyError(c,"invalid DB index");
    } else {
        addReply(c,shared.ok);
    }
}

randomkeyCommand:隨即返回一個鍵

void randomkeyCommand(redisClient *c) {
    robj *key;

    // 隨機返回鍵
    if ((key = dbRandomKey(c->db)) == NULL) {
        addReply(c,shared.nullbulk);
        return;
    }

    addReplyBulk(c,key);
    decrRefCount(key);
}

keysCommand:返回和客戶端指定的模式一致的所有鍵

void keysCommand(redisClient *c) {
    dictIterator *di;
    dictEntry *de;

    // 模式
    sds pattern = c->argv[1]->ptr;

    int plen = sdslen(pattern), allkeys;
    unsigned long numkeys = 0;
    void *replylen = addDeferredMultiBulkLength(c);

    // 遍歷整個資料庫,返回(名字)和模式匹配的鍵
    di = dictGetSafeIterator(c->db->dict);
    allkeys = (pattern[0] == '*' && pattern[1] == '\0');
    while((de = dictNext(di)) != NULL) {
        sds key = dictGetKey(de);
        robj *keyobj;

        // 將鍵名和模式進行比對
        if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {

            // 建立一個儲存鍵名字的字串物件
            keyobj = createStringObject(key,sdslen(key));

            // 刪除已過期鍵
            if (expireIfNeeded(c->db,keyobj) == 0) {
                addReplyBulk(c,keyobj);
                numkeys++;
            }

            decrRefCount(keyobj);
        }
    }
    dictReleaseIterator(di);

    setDeferredMultiBulkLength(c,replylen,numkeys);
}

removeExpire:移除鍵 key 的過期時間

int removeExpire(redisDb *db, robj *key) {
    /* An expire may only be removed if there is a corresponding entry in the
     * main dict. Otherwise, the key will never be freed. */
    // 確保鍵帶有過期時間
    redisAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);

    // 刪除過期時間
    return dictDelete(db->expires,key->ptr) == DICT_OK;
}

setExpire:將鍵 key 的過期時間設為 when

void setExpire(redisDb *db, robj *key, long long when) {

    dictEntry *kde, *de;

    /* Reuse the sds from the main dict in the expire dict */
    // 取出鍵
    kde = dictFind(db->dict,key->ptr);

    redisAssertWithInfo(NULL,key,kde != NULL);

    // 根據鍵取出鍵的過期時間
    de = dictReplaceRaw(db->expires,dictGetKey(kde));

    // 設定鍵的過期時間
    // 這裡是直接使用整數值來儲存過期時間,不是用 INT 編碼的 String 物件
    dictSetSignedIntegerVal(de,when);
}

getExpire:返回給定 key 的過期時間。

long long getExpire(redisDb *db, robj *key) {
    dictEntry *de;

    /* No expire? return ASAP */
    // 獲取鍵的過期時間
    // 如果過期時間不存在,那麼直接返回
    if (dictSize(db->expires) == 0 ||
       (de = dictFind(db->expires,key->ptr)) == NULL) return -1;

    /* The entry was found in the expire dict, this means it should also
     * be present in the main dict (safety check). */
    redisAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);

    // 返回過期時間
    return dictGetSignedIntegerVal(de);
}

propagateExpire:將過期時間傳播到附屬節點和 AOF 檔案。

void propagateExpire(redisDb *db, robj *key) {
    robj *argv[2];

    // 構造一個 DEL key 命令
    argv[0] = shared.del;
    argv[1] = key;
    incrRefCount(argv[0]);
    incrRefCount(argv[1]);

    // 傳播到 AOF 
    if (server.aof_state != REDIS_AOF_OFF)
        feedAppendOnlyFile(server.delCommand,db->id,argv,2);

    // 傳播到所有附屬節點
    replicationFeedSlaves(server.slaves,db->id,argv,2);

    decrRefCount(argv[0]);
    decrRefCount(argv[1]);
}

expireIfNeeded:檢查 key 是否已經過期,如果是的話,將它從資料庫中刪除。

int expireIfNeeded(redisDb *db, robj *key) {

    // 取出鍵的過期時間
    mstime_t when = getExpire(db,key);
    mstime_t now;

    // 沒有過期時間
    if (when < 0) return 0; /* No expire for this key */

    /* Don't expire anything while loading. It will be done later. */
    // 如果伺服器正在進行載入,那麼不進行任何過期檢查
    if (server.loading) return 0;

    /* If we are in the context of a Lua script, we claim that time is
     * blocked to when the Lua script started. This way a key can expire
     * only the first time it is accessed and not in the middle of the
     * script execution, making propagation to slaves / AOF consistent.
     * See issue #1525 on Github for more information. */
    now = server.lua_caller ? server.lua_time_start : mstime();

    /* If we are running in the context of a slave, return ASAP:
     * the slave key expiration is controlled by the master that will
     * send us synthesized DEL operations for expired keys.
     *
     * Still we try to return the right information to the caller, 
     * that is, 0 if we think the key should be still valid, 1 if
     * we think the key is expired at this time. */
    // 當伺服器執行在 replication 模式時
    // 附屬節點並不主動刪除 key
    // 它只返回一個邏輯上正確的返回值
    // 真正的刪除操作要等待主節點發來刪除命令時才執行
    // 從而保證資料的同步
    if (server.masterhost != NULL) return now > when;

    // 執行到這裡,表示鍵帶有過期時間,並且伺服器為主節點

    /* Return when this key has not expired */
    // 如果未過期,返回 0
    if (now <= when) return 0;

    /* Delete the key */
    server.stat_expiredkeys++;

    // 向 AOF 檔案和附屬節點傳播過期資訊
    propagateExpire(db,key);

    // 傳送事件通知
    notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
        "expired",key,db->id);

    // 將過期鍵從資料庫中刪除
    return dbDelete(db,key);
}