LRU工程實現原始碼(一):Redis 記憶體淘汰策略
原始碼版本 Redis 6.0.0
記憶體淘汰是什麼?什麼時候記憶體淘汰
我們知道,當某個key被設定了過期時間之後,客戶端每次對該key的訪問(讀寫)都會事先檢測該key是否過期,如果過期就直接刪除;但有一些鍵只訪問一次,因此需要主動刪除,預設情況下redis每秒檢測10次,檢測的物件是所有設定了過期時間的鍵集合,每次從這個集合中隨機檢測20個鍵檢視他們是否過期,如果過期就直接刪除。
記憶體淘汰雖然也是刪除key,但是與我們設定的過期key刪除不同。
記憶體淘汰是說,如果redis已使用記憶體達到預設的limit值(你可以當做記憶體滿了),如果想要繼續儲存新的key,那麼必然要淘汰一些key來釋放記憶體空間。具體淘汰哪些key由不同的淘汰策來決定。
由於需要淘汰掉key,所以此時redis不適合用作key-value資料庫了,開啟了記憶體淘汰的redis一般用於快取中,因此很多人也把他叫快取淘汰和快取淘汰策略。
又因為記憶體淘汰的觸發時機是記憶體使用達到閾值,為了儲存新的key然後淘汰舊的key,因此也有人把記憶體淘汰策略叫做快取替換策略
記憶體淘汰策略
Redis 4.0 之前一共實現了 6 種記憶體淘汰策略,在 4.0 之後,又增加了 2 種策略。
我們可以按照是否會進行資料淘汰把它們分成兩類:
- 不進行資料淘汰的策略,只有 noeviction 這一種。
- 會進行淘汰的 7 種其他策略。
會進行淘汰的 7 種策略,我們可以再進一步根據淘汰候選資料集的範圍把它們分成兩類:
- 在設定了過期時間的資料中進行淘汰,包括 volatile-random、volatile-ttl、volatile-lru、volatile-lfu(Redis 4.0 後新增)四種。
- 在所有資料範圍內進行淘汰,包括 allkeys-lru、allkeys-random、allkeys-lfu(Redis 4.0 後新增)三種。我把這 8 種策略的分類,畫到了一張圖裡:
這8種策略的簡單描述都可以在redis.conf
檔案中找到
# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select one from the following behaviors:
#
# volatile-lru -> Evict using approximated LRU, only keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
# volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
# volatile-random -> Remove a random key having an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.
翻譯下:
# MAXMEMORY POLICY:當達到最大記憶體時,Redis 將如何選擇要刪除的內容
# 到達了。 您可以從以下行為中選擇一種:
#
# volatile-lru -> Evict 使用近似 LRU,只有設定了過期時間的鍵。
# allkeys-lru -> 使用近似 LRU 驅逐所有鍵。
# volatile-lfu -> 使用近似 LFU 驅逐,只有設定了過期時間的鍵。
# allkeys-lfu -> 使用近似 LFU 驅逐所有鍵。
# volatile-random -> 隨機刪除設定了過期時間的key。
# allkeys-random -> 隨機刪除一個鍵,任何鍵。
# volatile-ttl -> 刪除過期時間最近的key(次TTL)
# noeviction -> 不要驅逐任何東西,只是在寫操作時返回一個錯誤。
通過redis.conf
檔案中的內容我們可以知道預設策略是noeviction
# The default is:
#
# maxmemory-policy noeviction
Redis中的LRU淘汰演算法
限於篇幅,本文主要以LRU淘汰演算法展開講解,其他演算法流程大同小異,可在我的redis原始碼中文註釋專案中檢視
不瞭解LRU演算法基礎的可以先去百度瞭解下,很簡單
這裡就不講LRU演算法的基本知識了,我們來看Redis的LRU演算法的原理是怎麼樣的。
其實,LRU 演算法背後的想法非常樸素:它認為剛剛被訪問的資料,肯定還會被再次訪問,所以就把它放在 MRU 端;長久不訪問的資料,肯定就不會再被訪問了,所以就讓它逐漸後移到 LRU 端,在快取滿時,就優先刪除它。
不過,LRU 演算法在實際實現時,需要用連結串列管理所有的快取資料,這會帶來額外的空間開銷。而且,當有資料被訪問時,需要在連結串列上把該資料移動到 MRU 端,如果有大量資料被訪問,就會帶來很多連結串列移動操作,會很耗時,進而會降低 Redis 快取效能。
所以,在 Redis 中,LRU 演算法被做了簡化,以減輕資料淘汰對快取效能的影響。具體來說,Redis 預設會記錄每個資料的最近一次訪問的時間戳(由鍵值對資料結構 RedisObject 中的 lru 欄位記錄)。然後,Redis 在決定淘汰的資料時,第一次會隨機選出 N 個數據,把它們作為一個候選集合。接下來,Redis 會比較這 N 個數據的 lru 欄位,把 lru 欄位值最小的資料從快取中淘汰出去。
Redis 提供了一個配置引數 maxmemory-samples,這個引數就是 Redis 選出的資料個數 N。例如,我們執行如下命令,可以讓 Redis 選出 100 個數據作為候選資料集:
CONFIG SET maxmemory-samples 100
redis.conf
檔案中預設配置是5
# The default of 5 produces good enough results. 10 Approximates very closely
# true LRU but costs more CPU. 3 is faster but not very accurate.
#
# maxmemory-samples 5
當需要再次淘汰資料時,Redis 需要挑選資料進入第一次淘汰時建立的候選集合。這兒的挑選標準是:能進入候選集合的資料的 lru 欄位值必須小於候選集合中最小的 lru 值。當有新資料進入候選資料集後,如果候選資料集中的資料個數達到了 maxmemory-samples,Redis 就把候選資料集中 lru 欄位值最小的資料淘汰出去。
這樣一來,Redis 快取不用為所有的資料維護一個大連結串列,也不用在每次資料訪問時都移動連結串列項,提升了快取的效能。
原始碼剖析
原始碼中文註釋可以在我的 github 上 的 Redis 6.0 中文註釋專案裡看到,歡迎訪問
第一步:什麼時候開始淘汰key
配置讀取
struct redisServer {
// 可使用的最大記憶體位元組數
unsigned long long maxmemory; /* Max number of memory bytes to use */
// key 淘汰策略
int maxmemory_policy; /* Policy for key eviction */
// 隨機抽樣精度
int maxmemory_samples; /* Precision of random sampling */
}
檢查時機
執行記憶體淘汰的方法是freeMemoryIfNeeded
,實現在evict.c
中.
注意redis 3.0版本中該方法在
redis.c
中
這個方法主要的作用是檢查記憶體狀態,如果記憶體達到閾值就進行記憶體淘汰,釋放記憶體空間直至到閾值以下。
開啟redis原始碼可以很輕易搜尋到該方法是被freeMemoryIfNeededAndSafe
呼叫的
/*
* 這是 freeMemoryIfNeeded() 的包裝器,只有在現在有條件安全地執行時才真正呼叫該函式:
* - 超時條件下不能有指令碼。
* - 現在沒有載入資料。
*/
int freeMemoryIfNeededAndSafe(void) {
if (server.lua_timedout || server.loading) return C_OK;
return freeMemoryIfNeeded();
}
那麼這個方法又是被誰呼叫的呢,有兩個地方
- updateMaxmemory
- processCommand
第一個看名字是在更新Maxmemory時呼叫的,本文不講,我們主要來看第二個方法
在處理命令的時候,會呼叫server.c
中的processCommand
方法
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
*
* 如果這個函式被呼叫,我們已經讀取了整個命令,引數在客戶端 argv/argc 欄位中。
*
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* processCommand() 執行命令或準備伺服器以從客戶端進行批量讀取。
*/
int processCommand(client *c) {
// 省略...
// 如果配置了 server.maxmemory 並且 lua指令碼沒有在超時條件下執行
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
// 省略...
}
// 省略...
}
getMaxmemoryState
緊接著會在freeMemoryIfNeeded
方法中檢查記憶體狀態來判斷是否需要記憶體淘汰
/*
* 根據當前的“maxmemory”設定,定期呼叫此函式以檢視是否有可用記憶體。 如果我們超過記憶體限制,該函式將嘗試釋放一些記憶體以返回低於限制。
*
* 如果我們低於記憶體限制或超過限制但嘗試釋放記憶體成功,該函式將返回 C_OK。
* 否則,如果我們超過了記憶體限制,但沒有足夠的記憶體被釋放以返回低於限制,則該函式返回 C_ERR。
*/
int freeMemoryIfNeeded(void) {
// 預設情況下,從節點應該忽略maxmemory指令,僅僅做從節點該做的事情就好
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
// 當客戶端暫停時,資料集應該是靜態的,不僅來自客戶端的 POV 無法寫入,還有來自POV過期和驅逐key也無法執行。
if (clientsArePaused()) return C_OK;
// 檢查記憶體狀態,有沒有超出限制,如果有,會計算需要釋放的記憶體和已使用記憶體
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
return C_OK;
}
getMaxmemoryState
方法也在evict.c
中:
/* 從maxmemory指令的角度獲取記憶體狀態:
* 如果使用的記憶體低於 maxmemory 設定,則返回 C_OK。
* 否則,如果超過記憶體限制,函式返回
* C_ERR。
*
* 該函式可能會通過引用返回附加資訊,僅當
* 指向相應引數的指標不為 NULL。某些欄位是
* 僅在返回 C_ERR 時填充:
*
* 'total' 使用的總位元組數。
*(為 C_ERR 和 C_OK 填充)
*
* 'logical' 使用的記憶體量減去從/AOF緩衝區。
*(返回 C_ERR 時填充)
*
* 'tofree' 為了回到記憶體限制應該釋放的記憶體量
*
*(返回 C_ERR 時填充)
*
* 'level' 這通常範圍從 0 到 1,並報告數量
* 當前使用的記憶體。如果我們超過了記憶體限制可能會 > 1。
*(為 C_ERR 和 C_OK 填充)
*/
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level)
通過以上原始碼,我們能夠知道什麼時候會記憶體淘汰。
下一步我們來看看會淘汰哪些key
第二步:淘汰哪些key
freeMemoryIfNeeded
文章正文中我們提到過記憶體淘汰策略,但是第一步並沒有涉及到具體的策略。那麼不同的策略是在哪裡實現的呢?這是在freeMemoryIfNeeded
方法中判斷的,順便這裡先帶你瞭解下freeMemoryIfNeeded
方法的骨架
int freeMemoryIfNeeded(void) {
/*
*
* mem_reported 已使用記憶體
* mem_tofree 需要釋放的記憶體
* mem_freed 已釋放記憶體
*/
size_t mem_reported, mem_tofree, mem_freed;
// 檢查記憶體狀態,有沒有超出限制,如果有,會計算需要釋放的記憶體和已使用記憶體
// 這個方法不但會計算記憶體,還會賦值 mem_reported mem_freed
if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL) == C_OK)
return C_OK;
// 初始化已釋放記憶體的位元組數為 0
mem_freed = 0;
// 不進行記憶體淘汰
if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION)
goto cant_free; /* We need to free memory, but policy forbids. */
// 根據 maxmemory 策略,
// 遍歷字典,釋放記憶體並記錄被釋放記憶體的位元組數
while (mem_freed < mem_tofree) {
// 最佳淘汰key
sds bestkey = NULL;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
// 不同的策略找bestKey
}
/* volatile-random and allkeys-random policy */
else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM)
{
// 不同的策略找bestKey
}
// 最後選定的要刪除的key
if (bestkey) {
// 在這裡刪除key
}
}
}
上面的原始碼省略了很多東西,但我相信你已經知道這個方法在做什麼了,也應該知道不同的策略其實就是在找不同的key。我們重點分析LRU策略下的bestKey
選擇。
redis索引
這個時候要去redis資料庫拿東西了,查詢的話還是需要索引的,因此我先向你簡單羅列下Redis索引相關的資料結構,很簡單,不做文字表述了
server.h
// Redis 資料庫。 有多個數據庫由從 0(預設資料庫)到最大配置資料庫的整數標識。 資料庫編號是結構中的“id”欄位。
typedef struct redisDb {
/* 資料庫鍵空間,儲存著資料庫中的所有鍵值對 */
dict *dict; /* The keyspace for this DB */
// 設定超時時間的key集合
dict *expires; /* Timeout of keys with a timeout set */
// 客戶端等待資料的key (BLPOP)
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
// 收到 PUSH 被阻塞的key
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
// 資料庫的鍵的平均 TTL ,統計資訊
long long avg_ttl; /* Average TTL, just for stats */
//
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
// 嘗試逐一進行碎片整理的鍵名列表。
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
注意 redisDb 有兩個重要的dict,*dict是主字典,儲存所有的鍵集合, *expires儲存設定了超時時間的集合,後面根據不同的淘汰策略就可以從不同的資料集拿資料
dict.h
/*
* 字典
*/
typedef struct dict {
// 型別特定函式
dictType *type;
// 私有資料
void *privdata;
// 雜湊表
dictht ht[2];
// rehash 索引
// 當 rehash 不在進行時,值為 -1
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
// 當前執行的迭代器數量
unsigned long iterators; /* number of iterators currently running */
} dict;
每個dict又儲存了2個dictht,為什麼是兩個呢?用來實現漸進式 rehash
/* 雜湊表
* 每個字典都使用兩個雜湊表,從而實現漸進式 rehash 。
*/
typedef struct dictht {
// 雜湊表陣列
dictEntry **table;
// 雜湊表大小
unsigned long size;
// 雜湊表大小掩碼,用於計算索引值
// 總是等於 size - 1
unsigned long sizemask;
// 該雜湊表已有節點的數量
unsigned long used;
} dictht;
到了雜湊表了,這個是真正儲存元素的地方了,再來看雜湊表元素長什麼樣
/*
* 雜湊表節點
*/
typedef struct dictEntry {
// 鍵
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 指向下個雜湊表節點,形成連結串列
struct dictEntry *next;
} dictEntry;
以上就是redis索引相關的資料結構,我們可以看到redis索引是通過雜湊表來實現的。上面說的是索引的儲存,那麼我們放入redis的value儲存在哪呢?
server.h
typedef struct redisObject {
// 型別
unsigned type:4;
// 編碼
unsigned encoding:4;
// 物件最後一次被訪問的時間
unsigned lru:LRU_BITS;
// 引用計數
int refcount;
// 指向實際值的指標
void *ptr;
} robj;
這是我們的值物件資料結構定義,type就是具體的值的資料型別了,lru是物件最後一次被訪問的時間,由於只有24位,無法記錄完整的時間,因此只記錄了unix time的低24位,24 bits資料要溢位的話需要194天,而快取的資料更新非常頻繁,已經足夠了。詳細的關於lru精度的問題可以檢視原始碼。到這裡你應該對redis的結構有了基本瞭解。下面在來看如何挑選bestkey更簡單一些。
淘汰池
// 最佳淘汰key
sds bestkey = NULL;
// key所屬db
int bestdbid;
// Redis資料庫
redisDb *db;
// 字典
dict *dict;
// 雜湊表節點
dictEntry *de;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
// 淘汰池
struct evictionPoolEntry *pool = EvictionPoolLRU;
while(bestkey == NULL) {
unsigned long total_keys = 0, keys;
// 從每個資料庫抽樣key填充淘汰池
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
// db->dict: 資料庫所有key集合
// db->expires: 資料中設定過期時間的key集合
// 判斷淘汰策略是否是針對所有鍵的
dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->dict : db->expires;
// 計算字典元素數量,不為0才可以挑選key
if ((keys = dictSize(dict)) != 0) {
// 填充淘汰池,四個引數分別為 dbid,候選集合,主字典集合,淘汰池
// 填充完的淘汰池內部是有序的,按空閒時間升序
evictionPoolPopulate(i, dict, db->dict, pool);
// 已遍歷檢測過的key數量
total_keys += keys;
}
}
// 如果 total_keys = 0,沒有要淘汰的key(redis沒有key或者沒有設定過期時間的key),break
if (!total_keys) break; /* No keys to evict. */
// 遍歷淘汰池,從淘汰池末尾(空閒時間最長)開始向前迭代
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
// 獲取當前key所屬的dbid
bestdbid = pool[k].dbid;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
// 如果淘汰策略針對所有key,從 redisDb.dict 中取資料,redisDb.dict 指向所有的鍵值集合
de = dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
} else { // 如果淘汰策略不是針對所有key,從 redisDb.expires 中取資料,redisDb.expires 指向已過期鍵值集合
de = dictFind(server.db[pool[k].dbid].expires,
pool[k].key);
}
if (pool[k].key != pool[k].cached)
sdsfree(pool[k].key);
// 從池中刪除這個key,不管這個key還在不在
pool[k].key = NULL;
pool[k].idle = 0;
// 如果這個節點存在,就跳出這個迴圈,否則嘗試下一個元素。
// 這個節點可能已經不存在了,比如到了過期時間被刪除了
if (de) {
// de是key所在雜湊表節點,bestkey是 key 名
bestkey = dictGetKey(de);
break;
} else {
/* Ghost... Iterate again. */
}
}
}
}
上面的原始碼註釋已經很清晰了,大概有如下幾個步驟:
- 初始化淘汰池
- 遍歷資料庫
- 根據淘汰策略是所有key還是過期key,從而選擇不同的資料集(redisDb.expires or redisDb.dict)
- 呼叫
evictionPoolPopulate
方法,傳入上一步選擇的資料集,填充淘汰池資料並排好序 - 從淘汰池中選取要淘汰的key(空閒時間最長的key),並且刪除淘汰池中該key的引用。如果該key已經沒了,那麼選取淘汰池中次優的key,直到找到一個還存在的key
核心步驟當然是evictionPoolPopulate
方法,我們來來看看淘汰池的元素結構。
evict.c
// 淘汰池大小
#define EVPOOL_SIZE 16
// 淘汰池快取的最大sds大小
#define EVPOOL_CACHED_SDS_SIZE 255
struct evictionPoolEntry {
// 物件空閒時間
// 這被稱為空閒只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。
unsigned long long idle; /* Object idle time (inverse frequency for LFU) */
sds key; /* Key name. */
// 用來儲存一個sds物件留待複用,注意我們要複用的是sds的記憶體空間,只需關注cached的長度(決定是否可以複用),無需關注他的內容
sds cached; /* Cached SDS object for key name. */
int dbid; /* Key DB number. */
};
淘汰池不只可以給LRU使用,你可以在後面的原始碼中看到,LFU以及TTL也會使用淘汰池
知道了淘汰池長什麼樣子我相信下面的程式碼你就好理解了,無非是按idle
欄位升序排序
evict.c
/*
* 這是 freeMemoryIfNeeded() 的輔助函式,用於在每次我們想要key過期時用一些條目填充 evictionPool。
*
* 新增空閒時間小於當前所有key空閒時間的key,如果池是空的則key會一直被新增
*
* 我們按升序將鍵依次插入,因此空閒時間較小的鍵在左側,而空閒時間較長的鍵在右側。
*/
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int j, k, count;
// 初始化抽樣集合,大小為 server.maxmemory_samples
dictEntry *samples[server.maxmemory_samples];
// 此函式對字典進行取樣以從隨機位置返回一些鍵
count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
for (j = 0; j < count; j++) {
// 這被稱為空閒只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。
unsigned long long idle;
// key
sds key;
// 值物件
robj *o;
// 雜湊表節點
dictEntry *de;
de = samples[j];
key = dictGetKey(de);
/* 如果我們取樣的字典不是主字典(而是過期的字典),我們需要在鍵字典中再次查詢鍵以獲得值物件。*/
if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
if (sampledict != keydict) de = dictFind(keydict, key);
o = dictGetVal(de);
}
/* 根據策略計算空閒時間。 這被稱為空閒只是因為程式碼最初處理 LRU,但實際上只是一個分數,其中更高的分數意味著更好的候選者。*/
if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) {
// 給定一個物件,使用近似的 LRU 演算法返回未請求過該物件的最小毫秒數
idle = estimateObjectIdleTime(o);
} else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { // LFU 策略也是用這個池子
idle = 255-LFUDecrAndReturn(o);
} else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
// 在這種情況下,越早過期越好。
idle = ULLONG_MAX - (long)dictGetVal(de);
} else {
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}
/* 將元素插入池中*/
k = 0;
// 遍歷淘汰池,從左邊開始,找到第一個空桶或者第一個空閒時間大於等於待選元素的桶,k是該元素的座標
while (k < EVPOOL_SIZE &&
pool[k].key &&
pool[k].idle < idle) k++;
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
/* 如果元素小於我們擁有的最差元素並且沒有空桶,則無法插入。
*
* key == 0 說明上面的while迴圈一次也沒有進入
* 要麼第一個元素就是空的,要麼所有已有元素的空閒時間都大於等於待插入元素的空閒時間(待插入元素比已有所有元素都優質)
* 又因為陣列最後一個key不為空,因為是從左邊開始插入的,所以排除了第一個元素是空的
*/
continue;
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
/* 插入空桶,插入前無需設定 */
} else {
/* 插入中間,現在 k 指向比要插入的元素空閒時間大的第一個元素 */
if (pool[EVPOOL_SIZE-1].key == NULL) {
/* 陣列末尾有空桶,將所有元素從 k 向右移動到末尾。*/
/* 覆蓋前儲存 SDS */
sds cached = pool[EVPOOL_SIZE-1].cached;
// 注意這裡不設定 pool[k], 只是給 pool[k] 騰位置
memmove(pool+k+1,pool+k,
sizeof(pool[0])*(EVPOOL_SIZE-k-1));
// 轉移 cached (sds物件)
pool[k].cached = cached;
} else {
/* 右邊沒有可用空間? 在 k-1 處插入 */
k--;
/*
* 將k(包含)左側的所有元素向左移動,因此我們丟棄空閒時間較小的元素。
*/
sds cached = pool[0].cached;
if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
memmove(pool,pool+1,sizeof(pool[0])*k);
pool[k].cached = cached;
}
}
/*
* 嘗試重用在池條目中分配的快取 SDS 字串,因為分配和釋放此物件的成本很高
* 注意真正要複用的sds記憶體空間,避免重新申請記憶體,而不是他的值
*/
int klen = sdslen(key);
// 判斷字串長度來決定是否複用sds
if (klen > EVPOOL_CACHED_SDS_SIZE) {
// 複製一個新的 sds 字串並賦值
pool[k].key = sdsdup(key);
} else {
/*
* 記憶體拷貝函式,從資料來源拷貝num個位元組的資料到目標陣列
*
* destination:指向目標陣列的指標
* source:指向資料來源的指標
* num:要拷貝的位元組數
*
*/
// 複用sds物件
memcpy(pool[k].cached,key,klen+1);
// 重新設定sds長度
sdssetlen(pool[k].cached,klen);
// 真正設定key
pool[k].key = pool[k].cached;
}
// 設定空閒時間
pool[k].idle = idle;
// 設定key所在db
pool[k].dbid = dbid;
}
}
至此,經過以上步驟,我們能夠得到一個bestkey(LRU策略下即最長空閒時間的key),還有一個有了部分資料的淘汰池。
下一步,我們就要刪除這個key來釋放記憶體空間了。
第三步:如何刪除key
// 不斷迴圈刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
// 最佳淘汰key
sds bestkey = NULL;
// LRU策略或者LFU策略或者VOLATILE_TTL策略
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
// 不同的策略找bestKey
}
// 最後選定的要刪除的key
if (bestkey) {
db = server.db+bestdbid;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
// 處理過期key到從節點和 AOF 檔案
// 當 master 中的 key 過期時,則將此 key 的 DEL 操作傳送到所有 slaves 和 AOF 檔案(如果啟用)。
propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
// 獲取已使用記憶體
delta = (long long) zmalloc_used_memory();
// 是否開啟lazyfree機制
// lazyfree的原理就是在刪除物件時只是進行邏輯刪除,然後把物件丟給後臺,讓後臺執行緒去執行真正的destruct,避免由於物件體積過大而造成阻塞。
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
// 計算刪除key後的記憶體變化量
delta -= (long long) zmalloc_used_memory();
// 計算已釋放記憶體
mem_freed += delta;
}
}
第四步:什麼時候停止淘汰key
通過上面的原始碼我們能看到while迴圈條件是 while (mem_freed < mem_tofree)
,而在每一次刪除key的時候,都會累加已釋放key所佔有的記憶體:
// 不斷迴圈刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
if (bestkey) {
// 獲取已使用記憶體
delta = (long long) zmalloc_used_memory();
// 是否開啟lazyfree機制
// lazyfree的原理就是在刪除物件時只是進行邏輯刪除,然後把物件丟給後臺,讓後臺執行緒去執行真正的destruct,避免由於物件體積過大而造成阻塞。
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
// 計算刪除key後的記憶體變化量
delta -= (long long) zmalloc_used_memory();
// 計算已釋放記憶體
mem_freed += delta;
}
}
除此之外,如果配置開啟了server.lazyfree_lazy_eviction
(非同步淘汰),那麼每淘汰一些key後還會檢查一下記憶體狀態,如果記憶體已經達到期望了,那麼就可以手動滿足迴圈終止條件。
// 不斷迴圈刪除key,直至釋放足夠的記憶體
while (mem_freed < mem_tofree) {
if (bestkey) {
// 獲取已使用記憶體
delta = (long long) zmalloc_used_memory();
// 是否開啟lazyfree機制
// lazyfree的原理就是在刪除物件時只是進行邏輯刪除,然後把物件丟給後臺,讓後臺執行緒去執行真正的destruct,避免由於物件體積過大而造成阻塞。
if (server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
// 計算刪除key後的記憶體變化量
delta -= (long long) zmalloc_used_memory();
// 計算已釋放記憶體
mem_freed += delta;
/*
* 通常我們的停止條件是能夠釋放固定的、預先計算的記憶體量。
* 然而,當我們在另一個執行緒中刪除物件時,最好不時檢查我們是否已經到達我們的目標記憶體,因為“mem_freed”數量僅在 dbAsyncDelete() 呼叫中計算,而執行緒可以無時無刻釋放記憶體
*/
if (server.lazyfree_lazy_eviction && !(keys_freed % 16)) {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
/* Let's satisfy our stop condition. */
// 手動滿足停止條件
mem_freed = mem_tofree;
}
}
}
}
最後
限於篇幅,儘可能多的放了原始碼,沒有連貫的文字描述,如果你覺得這樣看起來比較累的話,可以去我的Github看看原始碼註釋