1. 程式人生 > 其它 >LRU工程實現原始碼(一):Redis 記憶體淘汰策略

LRU工程實現原始碼(一):Redis 記憶體淘汰策略

目錄

原始碼版本 Redis 6.0.0

記憶體淘汰是什麼?什麼時候記憶體淘汰

我們知道,當某個key被設定了過期時間之後,客戶端每次對該key的訪問(讀寫)都會事先檢測該key是否過期,如果過期就直接刪除;但有一些鍵只訪問一次,因此需要主動刪除,預設情況下redis每秒檢測10次,檢測的物件是所有設定了過期時間的鍵集合,每次從這個集合中隨機檢測20個鍵檢視他們是否過期,如果過期就直接刪除。

記憶體淘汰雖然也是刪除key,但是與我們設定的過期key刪除不同。

記憶體淘汰是說,如果redis已使用記憶體達到預設的limit值(你可以當做記憶體滿了),如果想要繼續儲存新的key,那麼必然要淘汰一些key來釋放記憶體空間。具體淘汰哪些key由不同的淘汰策來決定。

由於需要淘汰掉key,所以此時redis不適合用作key-value資料庫了,開啟了記憶體淘汰的redis一般用於快取中,因此很多人也把他叫快取淘汰和快取淘汰策略。

又因為記憶體淘汰的觸發時機是記憶體使用達到閾值,為了儲存新的key然後淘汰舊的key,因此也有人把記憶體淘汰策略叫做快取替換策略

記憶體淘汰策略

Redis 4.0 之前一共實現了 6 種記憶體淘汰策略,在 4.0 之後,又增加了 2 種策略。

我們可以按照是否會進行資料淘汰把它們分成兩類:

  • 不進行資料淘汰的策略,只有 noeviction 這一種。
  • 會進行淘汰的 7 種其他策略。

會進行淘汰的 7 種策略,我們可以再進一步根據淘汰候選資料集的範圍把它們分成兩類:

  • 在設定了過期時間的資料中進行淘汰,包括 volatile-random、volatile-ttl、volatile-lru、volatile-lfu(Redis 4.0 後新增)四種。
  • 在所有資料範圍內進行淘汰,包括 allkeys-lru、allkeys-random、allkeys-lfu(Redis 4.0 後新增)三種。我把這 8 種策略的分類,畫到了一張圖裡:

這8種策略的簡單描述都可以在redis.conf檔案中找到

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select one from the following behaviors:
#
# volatile-lru -> Evict using approximated LRU, only keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
# volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
# volatile-random -> Remove a random key having an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.

翻譯下:

# MAXMEMORY POLICY:當達到最大記憶體時,Redis 將如何選擇要刪除的內容
# 到達了。 您可以從以下行為中選擇一種:
#
# volatile-lru -> Evict 使用近似 LRU,只有設定了過期時間的鍵。
# allkeys-lru -> 使用近似 LRU 驅逐所有鍵。
# volatile-lfu -> 使用近似 LFU 驅逐,只有設定了過期時間的鍵。
# allkeys-lfu -> 使用近似 LFU 驅逐所有鍵。
# volatile-random -> 隨機刪除設定了過期時間的key。
# allkeys-random -> 隨機刪除一個鍵,任何鍵。
# volatile-ttl -> 刪除過期時間最近的key(次TTL)
# noeviction -> 不要驅逐任何東西,只是在寫操作時返回一個錯誤。

通過redis.conf檔案中的內容我們可以知道預設策略是noeviction

# The default is:
#
# maxmemory-policy noeviction

Redis中的LRU淘汰演算法

限於篇幅,本文主要以LRU淘汰演算法展開講解,其他演算法流程大同小異,可在我的redis原始碼中文註釋專案中檢視

不瞭解LRU演算法基礎的可以先去百度瞭解下,很簡單

這裡就不講LRU演算法的基本知識了,我們來看Redis的LRU演算法的原理是怎麼樣的。

其實,LRU 演算法背後的想法非常樸素:它認為剛剛被訪問的資料,肯定還會被再次訪問,所以就把它放在 MRU 端;長久不訪問的資料,肯定就不會再被訪問了,所以就讓它逐漸後移到 LRU 端,在快取滿時,就優先刪除它。

不過,LRU 演算法在實際實現時,需要用連結串列管理所有的快取資料,這會帶來額外的空間開銷。而且,當有資料被訪問時,需要在連結串列上把該資料移動到 MRU 端,如果有大量資料被訪問,就會帶來很多連結串列移動操作,會很耗時,進而會降低 Redis 快取效能。

所以,在 Redis 中,LRU 演算法被做了簡化,以減輕資料淘汰對快取效能的影響。具體來說,Redis 預設會記錄每個資料的最近一次訪問的時間戳(由鍵值對資料結構 RedisObject 中的 lru 欄位記錄)。然後,Redis 在決定淘汰的資料時,第一次會隨機選出 N 個數據,把它們作為一個候選集合。接下來,Redis 會比較這 N 個數據的 lru 欄位,把 lru 欄位值最小的資料從快取中淘汰出去。

Redis 提供了一個配置引數 maxmemory-samples,這個引數就是 Redis 選出的資料個數 N。例如,我們執行如下命令,可以讓 Redis 選出 100 個數據作為候選資料集:

CONFIG SET maxmemory-samples 100

redis.conf檔案中預設配置是5


# The default of 5 produces good enough results. 10 Approximates very closely
# true LRU but costs more CPU. 3 is faster but not very accurate.
#
# maxmemory-samples 5

當需要再次淘汰資料時,Redis 需要挑選資料進入第一次淘汰時建立的候選集合。這兒的挑選標準是:能進入候選集合的資料的 lru 欄位值必須小於候選集合中最小的 lru 值。當有新資料進入候選資料集後,如果候選資料集中的資料個數達到了 maxmemory-samples,Redis 就把候選資料集中 lru 欄位值最小的資料淘汰出去。

這樣一來,Redis 快取不用為所有的資料維護一個大連結串列,也不用在每次資料訪問時都移動連結串列項,提升了快取的效能。

原始碼剖析

原始碼中文註釋可以在我的 github 上 的 Redis 6.0 中文註釋專案裡看到,歡迎訪問

第一步:什麼時候開始淘汰key

配置讀取
struct redisServer {
    // 可使用的最大記憶體位元組數
    unsigned long long maxmemory;   /* Max number of memory bytes to use */
    // key 淘汰策略
    int maxmemory_policy;           /* Policy for key eviction */
    // 隨機抽樣精度
    int maxmemory_samples;          /* Precision of random sampling */
}
檢查時機

執行記憶體淘汰的方法是freeMemoryIfNeeded,實現在evict.c中.

注意redis 3.0版本中該方法在redis.c

這個方法主要的作用是檢查記憶體狀態,如果記憶體達到閾值就進行記憶體淘汰,釋放記憶體空間直至到閾值以下。

開啟redis原始碼可以很輕易搜尋到該方法是被freeMemoryIfNeededAndSafe呼叫的


/* 
 * 這是 freeMemoryIfNeeded() 的包裝器,只有在現在有條件安全地執行時才真正呼叫該函式:
 * - 超時條件下不能有指令碼。
 * - 現在沒有載入資料。
 */
int freeMemoryIfNeededAndSafe(void) {
    if (server.lua_timedout || server.loading) return C_OK;
    return freeMemoryIfNeeded();
}

那麼這個方法又是被誰呼叫的呢,有兩個地方

  • updateMaxmemory
  • processCommand

第一個看名字是在更新Maxmemory時呼叫的,本文不講,我們主要來看第二個方法

在處理命令的時候,會呼叫server.c中的processCommand方法

/* If this function gets called we already read a whole
 * command, arguments are in the client argv/argc fields.
 * 
 * 如果這個函式被呼叫,我們已經讀取了整個命令,引數在客戶端 argv/argc 欄位中。
 * 
 * processCommand() execute the command or prepare the
 * server for a bulk read from the client.
 *  
 * processCommand() 執行命令或準備伺服器以從客戶端進行批量讀取。
 */
int processCommand(client *c) {
    // 省略...
    // 如果配置了 server.maxmemory 並且 lua指令碼沒有在超時條件下執行
    if (server.maxmemory && !server.lua_timedout) {
        int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
        // 省略...
    }
    // 省略...
}
getMaxmemoryState

緊接著會在freeMemoryIfNeeded方法中檢查記憶體狀態來判斷是否需要記憶體淘汰

/* 
 * 根據當前的“maxmemory”設定,定期呼叫此函式以檢視是否有可用記憶體。 如果我們超過記憶體限制,該函式將嘗試釋放一些記憶體以返回低於限制。
 *
 * 如果我們低於記憶體限制或超過限制但嘗試釋放記憶體成功,該函式將返回 C_OK。
 * 否則,如果我們超過了記憶體限制,但沒有足夠的記憶體被釋放以返回低於限制,則該函式返回 C_ERR。
 */
int freeMemoryIfNeeded(void) {
    // 預設情況下,從節點應該忽略maxmemory指令,僅僅做從節點該做的事情就好
    if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
    // 當客戶端暫停時,資料集應該是靜態的,不僅來自客戶端的 POV 無法寫入,還有來自POV過期和驅逐key也無法執行。
    if (clientsArePaused()) return C_OK;
    // 檢查記憶體狀態,有沒有超出限制,如果有,會計算需要釋放的記憶體和已使用記憶體
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return C_OK;
}

getMaxmemoryState 方法也在evict.c中:

/* 從maxmemory指令的角度獲取記憶體狀態:
 * 如果使用的記憶體低於 maxmemory 設定,則返回 C_OK。
 * 否則,如果超過記憶體限制,函式返回
 * C_ERR。
 *
 * 該函式可能會通過引用返回附加資訊,僅當
 * 指向相應引數的指標不為 NULL。某些欄位是
 * 僅在返回 C_ERR 時填充:
 *
 * 'total' 使用的總位元組數。
 *(為 C_ERR 和 C_OK 填充)
 *
 * 'logical' 使用的記憶體量減去從/AOF緩衝區。
 *(返回 C_ERR 時填充)
 *
 * 'tofree' 為了回到記憶體限制應該釋放的記憶體量
 * 
 *(返回 C_ERR 時填充)
 *
 * 'level' 這通常範圍從 0 到 1,並報告數量
 * 當前使用的記憶體。如果我們超過了記憶體限制可能會 > 1。
 *(為 C_ERR 和 C_OK 填充)
 */
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level)

通過以上原始碼,我們能夠知道什麼時候會記憶體淘汰。

下一步我們來看看會淘汰哪些key

第二步:淘汰哪些key

freeMemoryIfNeeded

文章正文中我們提到過記憶體淘汰策略,但是第一步並沒有涉及到具體的策略。那麼不同的策略是在哪裡實現的呢?這是在freeMemoryIfNeeded方法中判斷的,順便這裡先帶你瞭解下freeMemoryIfNeeded方法的骨架


int freeMemoryIfNeeded(void) {
    
    /*
     * 
     * mem_reported 已使用記憶體
     * mem_tofree 需要釋放的記憶體
     * mem_freed 已釋放記憶體
     */
    size_t mem_reported, mem_tofree, mem_freed;
    
    // 檢查記憶體狀態,有沒有超出限制,如果有,會計算需要釋放的記憶體和已使用記憶體
    // 這個方法不但會計算記憶體,還會賦值 mem_reported mem_freed
    if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
        return C_OK;
    // 初始化已釋放記憶體的位元組數為 0
    mem_freed = 0;
    
    // 不進行記憶體淘汰
    if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
        goto cant_free; /* We need to free memory, but policy forbids. */
        
    // 根據 maxmemory 策略,
    // 遍歷字典,釋放記憶體並記錄被釋放記憶體的位元組數
    while (mem_freed < mem_tofree) {
        // 最佳淘汰key
        sds bestkey = NULL;
        // LRU策略或者LFU策略或者VOLATILE_TTL策略
        if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
            server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
        {
            // 不同的策略找bestKey
        }
        /* volatile-random and allkeys-random policy */
        else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
                 server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
        {
            // 不同的策略找bestKey
        }
        
        // 最後選定的要刪除的key
        if (bestkey) {
            // 在這裡刪除key
        }
    }
    
}

上面的原始碼省略了很多東西,但我相信你已經知道這個方法在做什麼了,也應該知道不同的策略其實就是在找不同的key。我們重點分析LRU策略下的bestKey選擇。

redis索引

這個時候要去redis資料庫拿東西了,查詢的話還是需要索引的,因此我先向你簡單羅列下Redis索引相關的資料結構,很簡單,不做文字表述了

server.h

// Redis 資料庫。 有多個數據庫由從 0(預設資料庫)到最大配置資料庫的整數標識。 資料庫編號是結構中的“id”欄位。
typedef struct redisDb {
    /* 資料庫鍵空間,儲存著資料庫中的所有鍵值對 */
    dict *dict;               /* The keyspace for this DB */
    // 設定超時時間的key集合
    dict *expires;              /* Timeout of keys with a timeout set */
    // 客戶端等待資料的key (BLPOP)
    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/
    // 收到 PUSH 被阻塞的key
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    int id;                     /* Database ID */
    // 資料庫的鍵的平均 TTL ,統計資訊
    long long avg_ttl;          /* Average TTL, just for stats */
    // 
    unsigned long expires_cursor; /* Cursor of the active expire cycle. */
    // 嘗試逐一進行碎片整理的鍵名列表。
    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;

注意 redisDb 有兩個重要的dict,*dict是主字典,儲存所有的鍵集合, *expires儲存設定了超時時間的集合,後面根據不同的淘汰策略就可以從不同的資料集拿資料

dict.h

/*
 * 字典
 */
typedef struct dict {
    // 型別特定函式
    dictType *type;
    // 私有資料
    void *privdata;
    // 雜湊表
    dictht ht[2];
    // rehash 索引
    // 當 rehash 不在進行時,值為 -1
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    // 當前執行的迭代器數量
    unsigned long iterators; /* number of iterators currently running */
} dict;

每個dict又儲存了2個dictht,為什麼是兩個呢?用來實現漸進式 rehash

/* 雜湊表
 * 每個字典都使用兩個雜湊表,從而實現漸進式 rehash 。
 */
typedef struct dictht {
    // 雜湊表陣列
    dictEntry **table;
    // 雜湊表大小
    unsigned long size;
    // 雜湊表大小掩碼,用於計算索引值
    // 總是等於 size - 1
    unsigned long sizemask;
    // 該雜湊表已有節點的數量
    unsigned long used;
} dictht;

到了雜湊表了,這個是真正儲存元素的地方了,再來看雜湊表元素長什麼樣

/*
 * 雜湊表節點
 */
typedef struct dictEntry {
    // 鍵
    void *key;
    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    // 指向下個雜湊表節點,形成連結串列
    struct dictEntry *next;
} dictEntry;

以上就是redis索引相關的資料結構,我們可以看到redis索引是通過雜湊表來實現的。上面說的是索引的儲存,那麼我們放入redis的value儲存在哪呢?

server.h

typedef struct redisObject {
    // 型別
    unsigned type:4;

    // 編碼
    unsigned encoding:4;

    // 物件最後一次被訪問的時間
    unsigned lru:LRU_BITS;

    // 引用計數
    int refcount;

    // 指向實際值的指標
    void *ptr;
} robj;

這是我們的值物件資料結構定義,type就是具體的值的資料型別了,lru是物件最後一次被訪問的時間,由於只有24位,無法記錄完整的時間,因此只記錄了unix time的低24位,24 bits資料要溢位的話需要194天,而快取的資料更新非常頻繁,已經足夠了。詳細的關於lru精度的問題可以檢視原始碼。到這裡你應該對redis的結構有了基本瞭解。下面在來看如何挑選bestkey更簡單一些。

淘汰池
// 最佳淘汰key
sds bestkey = NULL;
// key所屬db
int bestdbid;
// Redis資料庫
redisDb *db;
// 字典
dict *dict;
// 雜湊表節點
dictEntry *de;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
    server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
    // 淘汰池
    struct evictionPoolEntry *pool = EvictionPoolLRU;

    while(bestkey == NULL) {
        unsigned long total_keys = 0, keys;

        // 從每個資料庫抽樣key填充淘汰池
        for (i = 0; i < server.dbnum; i++) {
            db = server.db+i;
            // db->dict: 資料庫所有key集合
            // db->expires: 資料中設定過期時間的key集合
            // 判斷淘汰策略是否是針對所有鍵的
            dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
                    db->dict : db->expires;
            // 計算字典元素數量,不為0才可以挑選key
            if ((keys = dictSize(dict)) != 0) {
                // 填充淘汰池,四個引數分別為 dbid,候選集合,主字典集合,淘汰池
                // 填充完的淘汰池內部是有序的,按空閒時間升序
                evictionPoolPopulate(i, dict, db->dict, pool);
                // 已遍歷檢測過的key數量
                total_keys += keys;
            }
        }
        // 如果 total_keys = 0,沒有要淘汰的key(redis沒有key或者沒有設定過期時間的key),break
        if (!total_keys) break; /* No keys to evict. */

        // 遍歷淘汰池,從淘汰池末尾(空閒時間最長)開始向前迭代
        for (k = EVPOOL_SIZE-1; k >= 0; k--) {
            if (pool[k].key == NULL) continue;
            // 獲取當前key所屬的dbid
            bestdbid = pool[k].dbid;

            if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
                // 如果淘汰策略針對所有key,從 redisDb.dict 中取資料,redisDb.dict 指向所有的鍵值集合
                de = dictFind(server.db[pool[k].dbid].dict,
                    pool[k].key);
            } else { // 如果淘汰策略不是針對所有key,從 redisDb.expires 中取資料,redisDb.expires 指向已過期鍵值集合
                de = dictFind(server.db[pool[k].dbid].expires,
                    pool[k].key);
            }

           
            if (pool[k].key != pool[k].cached)
                sdsfree(pool[k].key);
            // 從池中刪除這個key,不管這個key還在不在
            pool[k].key = NULL;
            pool[k].idle = 0;

            // 如果這個節點存在,就跳出這個迴圈,否則嘗試下一個元素。
            // 這個節點可能已經不存在了,比如到了過期時間被刪除了
            if (de) {
                // de是key所在雜湊表節點,bestkey是 key 名
                bestkey = dictGetKey(de);
                break;
            } else {
                /* Ghost... Iterate again. */
            }
        }
    }
}

上面的原始碼註釋已經很清晰了,大概有如下幾個步驟:

  1. 初始化淘汰池
  2. 遍歷資料庫
  3. 根據淘汰策略是所有key還是過期key,從而選擇不同的資料集(redisDb.expires or redisDb.dict)
  4. 呼叫evictionPoolPopulate方法,傳入上一步選擇的資料集,填充淘汰池資料並排好序
  5. 從淘汰池中選取要淘汰的key(空閒時間最長的key),並且刪除淘汰池中該key的引用。如果該key已經沒了,那麼選取淘汰池中次優的key,直到找到一個還存在的key

核心步驟當然是evictionPoolPopulate方法,我們來來看看淘汰池的元素結構。

evict.c

// 淘汰池大小
#define EVPOOL_SIZE 16
// 淘汰池快取的最大sds大小
#define EVPOOL_CACHED_SDS_SIZE 255

struct evictionPoolEntry {
    // 物件空閒時間
    // 這被稱為空閒只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。
    unsigned long long idle;    /* Object idle time (inverse frequency for LFU) */
    sds key;                    /* Key name. */
    // 用來儲存一個sds物件留待複用,注意我們要複用的是sds的記憶體空間,只需關注cached的長度(決定是否可以複用),無需關注他的內容
    sds cached;                 /* Cached SDS object for key name. */
    int dbid;                   /* Key DB number. */
};

淘汰池不只可以給LRU使用,你可以在後面的原始碼中看到,LFU以及TTL也會使用淘汰池

知道了淘汰池長什麼樣子我相信下面的程式碼你就好理解了,無非是按idle欄位升序排序

evict.c

/* 
 * 這是 freeMemoryIfNeeded() 的輔助函式,用於在每次我們想要key過期時用一些條目填充 evictionPool。
 * 
 * 新增空閒時間小於當前所有key空閒時間的key,如果池是空的則key會一直被新增
 * 
 * 我們按升序將鍵依次插入,因此空閒時間較小的鍵在左側,而空閒時間較長的鍵在右側。
 */
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
    int j, k, count;
    // 初始化抽樣集合,大小為 server.maxmemory_samples
    dictEntry *samples[server.maxmemory_samples];
    // 此函式對字典進行取樣以從隨機位置返回一些鍵
    count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
    for (j = 0; j < count; j++) {
        // 這被稱為空閒只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。
        unsigned long long idle;
        // key
        sds key;
        // 值物件
        robj *o;
        // 雜湊表節點
        dictEntry *de;

        de = samples[j];
        key = dictGetKey(de);

        /* 如果我們取樣的字典不是主字典(而是過期的字典),我們需要在鍵字典中再次查詢鍵以獲得值物件。*/
        if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
            if (sampledict != keydict) de = dictFind(keydict, key);
            o = dictGetVal(de);
        }

        /* 根據策略計算空閒時間。 這被稱為空閒只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。*/
        if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
            // 給定一個物件,使用近似的 LRU 演算法返回未請求過該物件的最小毫秒數
            idle = estimateObjectIdleTime(o);
        } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { // LFU 策略也是用這個池子
            idle = 255-LFUDecrAndReturn(o);
        } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
            // 在這種情況下,越早過期越好。
            idle = ULLONG_MAX - (long)dictGetVal(de);
        } else {
            serverPanic("Unknown eviction policy in evictionPoolPopulate()");
        }

        /* 將元素插入池中*/
        k = 0;
        // 遍歷淘汰池,從左邊開始,找到第一個空桶或者第一個空閒時間大於等於待選元素的桶,k是該元素的座標
        while (k < EVPOOL_SIZE &&
               pool[k].key &&
               pool[k].idle < idle) k++;
        if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
            /* 如果元素小於我們擁有的最差元素並且沒有空桶,則無法插入。
             * 
             * key == 0 說明上面的while迴圈一次也沒有進入
             * 要麼第一個元素就是空的,要麼所有已有元素的空閒時間都大於等於待插入元素的空閒時間(待插入元素比已有所有元素都優質)
             * 又因為陣列最後一個key不為空,因為是從左邊開始插入的,所以排除了第一個元素是空的
             */
            continue;
        } else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
            /* 插入空桶,插入前無需設定 */
        } else {
            /* 插入中間,現在 k 指向比要插入的元素空閒時間大的第一個元素 */
            if (pool[EVPOOL_SIZE-1].key == NULL) {
                /* 陣列末尾有空桶,將所有元素從 k 向右移動到末尾。*/
                /* 覆蓋前儲存 SDS */
                sds cached = pool[EVPOOL_SIZE-1].cached;
                // 注意這裡不設定 pool[k], 只是給 pool[k] 騰位置
                memmove(pool+k+1,pool+k,
                    sizeof(pool[0])*(EVPOOL_SIZE-k-1));
                // 轉移 cached (sds物件)
                pool[k].cached = cached;
            } else {
                
                /* 右邊沒有可用空間? 在 k-1 處插入 */
                k--;
                /*
                 * 將k(包含)左側的所有元素向左移動,因此我們丟棄空閒時間較小的元素。
                 */
                sds cached = pool[0].cached;
                if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
                memmove(pool,pool+1,sizeof(pool[0])*k);
                pool[k].cached = cached;
            }
        }
        /*
         * 嘗試重用在池條目中分配的快取 SDS 字串,因為分配和釋放此物件的成本很高
         * 注意真正要複用的sds記憶體空間,避免重新申請記憶體,而不是他的值
         */
        int klen = sdslen(key);
        // 判斷字串長度來決定是否複用sds
        if (klen > EVPOOL_CACHED_SDS_SIZE) {
            // 複製一個新的 sds 字串並賦值
            pool[k].key = sdsdup(key);
        } else {
            /*
             * 記憶體拷貝函式,從資料來源拷貝num個位元組的資料到目標陣列
             * 
             * destination:指向目標陣列的指標 
             * source:指向資料來源的指標 
             * num:要拷貝的位元組數
             *
             */
            // 複用sds物件
            memcpy(pool[k].cached,key,klen+1);
            // 重新設定sds長度
            sdssetlen(pool[k].cached,klen);
            // 真正設定key
            pool[k].key = pool[k].cached;
        }
        // 設定空閒時間
        pool[k].idle = idle;
        // 設定key所在db
        pool[k].dbid = dbid;
    }
}

至此,經過以上步驟,我們能夠得到一個bestkey(LRU策略下即最長空閒時間的key),還有一個有了部分資料的淘汰池。

下一步,我們就要刪除這個key來釋放記憶體空間了。

第三步:如何刪除key

// 不斷迴圈刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
    // 最佳淘汰key
    sds bestkey = NULL;
    // LRU策略或者LFU策略或者VOLATILE_TTL策略
    if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
        server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
    {
        // 不同的策略找bestKey
    }
    // 最後選定的要刪除的key
    if (bestkey) {
        db = server.db+bestdbid;
        robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
        // 處理過期key到從節點和 AOF 檔案
        // 當 master 中的 key 過期時,則將此 key 的 DEL 操作傳送到所有 slaves 和 AOF 檔案(如果啟用)。
        propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
       
        // 獲取已使用記憶體
        delta = (long long) zmalloc_used_memory();
        
        // 是否開啟lazyfree機制
        // lazyfree的原理就是在刪除物件時只是進行邏輯刪除,然後把物件丟給後臺,讓後臺執行緒去執行真正的destruct,避免由於物件體積過大而造成阻塞。
        if (server.lazyfree_lazy_eviction)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        
        // 計算刪除key後的記憶體變化量
        delta -= (long long) zmalloc_used_memory();
        
        // 計算已釋放記憶體
        mem_freed += delta;
        
    }
}

第四步:什麼時候停止淘汰key

通過上面的原始碼我們能看到while迴圈條件是 while (mem_freed < mem_tofree),而在每一次刪除key的時候,都會累加已釋放key所佔有的記憶體:

// 不斷迴圈刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
    if (bestkey) {
        // 獲取已使用記憶體
        delta = (long long) zmalloc_used_memory();
        
        // 是否開啟lazyfree機制
        // lazyfree的原理就是在刪除物件時只是進行邏輯刪除,然後把物件丟給後臺,讓後臺執行緒去執行真正的destruct,避免由於物件體積過大而造成阻塞。
        if (server.lazyfree_lazy_eviction)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        
        // 計算刪除key後的記憶體變化量
        delta -= (long long) zmalloc_used_memory();
        
        // 計算已釋放記憶體
        mem_freed += delta;
        
    }
}

除此之外,如果配置開啟了server.lazyfree_lazy_eviction(非同步淘汰),那麼每淘汰一些key後還會檢查一下記憶體狀態,如果記憶體已經達到期望了,那麼就可以手動滿足迴圈終止條件。

// 不斷迴圈刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
    if (bestkey) {
        // 獲取已使用記憶體
        delta = (long long) zmalloc_used_memory();
        
        // 是否開啟lazyfree機制
        // lazyfree的原理就是在刪除物件時只是進行邏輯刪除,然後把物件丟給後臺,讓後臺執行緒去執行真正的destruct,避免由於物件體積過大而造成阻塞。
        if (server.lazyfree_lazy_eviction)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        
        // 計算刪除key後的記憶體變化量
        delta -= (long long) zmalloc_used_memory();
        
        // 計算已釋放記憶體
        mem_freed += delta;
        
        /*
         * 通常我們的停止條件是能夠釋放固定的、預先計算的記憶體量。 
         * 然而,當我們在另一個執行緒中刪除物件時,最好不時檢查我們是否已經到達我們的目標記憶體,因為“mem_freed”數量僅在 dbAsyncDelete() 呼叫中計算,而執行緒可以無時無刻釋放記憶體
         */
        if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
            if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
                /* Let's satisfy our stop condition. */
                // 手動滿足停止條件
                mem_freed = mem_tofree;
            }
        }
        
    }
    
}

最後

限於篇幅,儘可能多的放了原始碼,沒有連貫的文字描述,如果你覺得這樣看起來比較累的話,可以去我的Github看看原始碼註釋