redis原始碼分析之十一記憶體管理
一、redis的記憶體管理
一般來說,稍微有點規模的軟體,都會自己搞一塊記憶體管理,原因很簡單,統一管理記憶體,適應自己的場景。其實按大牛們的話,這未必是最優選擇,實在是小看了寫庫的那群大牛們。不過說歸說,人家寫也不會給你報備,想寫自然就寫了。Redis就聽從了大牛的看法,使用了底層更好的記憶體分配庫,根據情況使用tmalloc,jemalloc 以及glibc中的 malloc(pmalloc)。
一般來說,記憶體管理不外乎這麼幾塊:
記憶體大小的管理,比如分不分頁,一頁多大為好。記憶體分配管理,怎麼分配,多大一塊;記憶體回收管理,什麼時候兒回收,回收後如何處理,記憶體碎片如何處理;記憶體回收的策略;記憶體的讀寫操作和異常控制等。針對上面的這些實際情況,結合Redis分析一下其相關內容:
記憶體回收管理:使用LRU,近似LRU和LFU等策略進行記憶體的回收控制;通過自動記憶體交換進行記憶體碎片的處理;
讀寫和異常控制:讀寫操作為了快速,以固定值來操作,換句話說強制位元組對齊;而在記憶體超出使用範圍後,使用記憶體溢位管理和處理這種異常情況,這也是引起記憶體碎片的原因;
從上面的分析是不是可以看出,其實設計就是平衡,根據實際情況進行取捨。
二、原始碼分析
下面就根據上面的分析對原始碼進行解讀。
1、基礎的記憶體管理
在Redist的zmalloc.h標頭檔案中定義:
#ifndef__ZMALLOC_H #define__ZMALLOC_H /*Doubleexpansionneededforstringificationofmacrovalues.*/ #define__xstr(s)__str(s) #define__str(s)#s #ifdefined(USE_TCMALLOC) #defineZMALLOC_LIB("tcmalloc-"__xstr(TC_VERSION_MAJOR)"."__xstr(TC_VERSION_MINOR)) #include<google/tcmalloc.h> #if(TC_VERSION_MAJOR==1&&TC_VERSION_MINOR>=6)||(TC_VERSION_MAJOR>1) #defineHAVE_MALLOC_SIZE1 #definezmalloc_size(p)tc_malloc_size(p) #else #error"Newerversionoftcmallocrequired" #endif #elifdefined(USE_JEMALLOC) #defineZMALLOC_LIB("jemalloc-"__xstr(JEMALLOC_VERSION_MAJOR)"."__xstr(JEMALLOC_VERSION_MINOR)"."__xstr(JEMALLOC_VERSION_BUGFIX)) #include<jemalloc/jemalloc.h> #if(JEMALLOC_VERSION_MAJOR==2&&JEMALLOC_VERSION_MINOR>=1)||(JEMALLOC_VERSION_MAJOR>2) #defineHAVE_MALLOC_SIZE1 #definezmalloc_size(p)je_malloc_usable_size(p) #else #error"Newerversionofjemallocrequired" #endif #elifdefined(__APPLE__) #include<malloc/malloc.h> #defineHAVE_MALLOC_SIZE1 #definezmalloc_size(p)malloc_size(p) #endif #ifndefZMALLOC_LIB #defineZMALLOC_LIB"libc" #ifdef__GLIBC__ #include<malloc.h> #defineHAVE_MALLOC_SIZE1 #definezmalloc_size(p)malloc_usable_size(p) #endif #endif
通過巨集的定義來檢查當前的機器上是否安裝了相關的記憶體庫,前面兩個就是針對tmalloc和 jemalloc來定義的。如果有這兩個更有優勢的庫,那麼就優勢使用這兩個庫。如果都沒有,那麼就使用glibc中的malloc。它的主要介面也很簡單:
//分配記憶體,根據情況呼叫不同的庫實現
void*zmalloc(size_tsize);
//重寫塊分配記憶體,但策略有所不同,不再成倍增加
void*zcalloc(size_tsize);
//重新調整記憶體
void*zrealloc(void*ptr,size_tsize);
voidzfree(void*ptr);
//字元持久化
char*zstrdup(constchar*s);
//當前使用記憶體大小
size_tzmalloc_used_memory(void);
//異常呼叫
voidzmalloc_set_oom_handler(void(*oom_handler)(size_t));
//獲取程序當前記憶體大小(RSS,未被交換的記憶體)
size_tzmalloc_get_rss(void);
intzmalloc_get_allocator_info(size_t*allocated,size_t*active,size_t*resident);
voidset_jemalloc_bg_thread(intenable);
intjemalloc_purge();//jemalloc的記憶體碎片清理
size_tzmalloc_get_private_dirty(longpid);//程序修改記憶體
size_tzmalloc_get_smap_bytes_by_field(char*field,longpid);///proc/self/smaps中的資料
size_tzmalloc_get_memory_size(void);//實體記憶體大小
voidzlibc_free(void*ptr);
#ifdefHAVE_DEFRAG
voidzfree_no_tcache(void*ptr);
void*zmalloc_no_tcache(size_tsize);
#endif
#ifndefHAVE_MALLOC_SIZE
//獲得當前記憶體塊總大小
size_tzmalloc_size(void*ptr);
size_tzmalloc_usable(void*ptr);
#else
#definezmalloc_usable(p)zmalloc_size(p)
#endif
看一下基礎的記憶體分配:
#ifdefined(USE_TCMALLOC)
#definemalloc(size)tc_malloc(size)
#definecalloc(count,size)tc_calloc(count,size)
#definerealloc(ptr,size)tc_realloc(ptr,size)
#definefree(ptr)tc_free(ptr)
#elifdefined(USE_JEMALLOC)
#definemalloc(size)je_malloc(size)
#definecalloc(count,size)je_calloc(count,size)
#definerealloc(ptr,size)je_realloc(ptr,size)
#definefree(ptr)je_free(ptr)
#definemallocx(size,flags)je_mallocx(size,flags)
#definedallocx(ptr,flags)je_dallocx(ptr,flags)
#endif
//其實就是一個鎖來控制記憶體的多執行緒安全的增長數量
#defineatomicIncr(var,count)do{\這個斜槓,表明這個巨集定義和下面是連線在一起的
pthread_mutex_lock(&var##_mutex);\兩個#表示兩個字元連線在一起,一個表示這是一個變數
var+=(count);\
pthread_mutex_unlock(&var##_mutex);\
}while(0)
#defineatomicGetIncr(var,oldvalue_var,count)do{\
pthread_mutex_lock(&var##_mutex);\
oldvalue_var=var;\
var+=(count);\
pthread_mutex_unlock(&var##_mutex);\
}while(0)
//dowhile在這裡是為了保證巨集的完整性的一種技巧,不必當成一個迴圈
#defineupdate_zmalloc_stat_alloc(__n)do{\
//下面的巨集程式碼是為了判斷8位元組對齊
size_t_n=(__n);\
if(_n&(sizeof(long)-1))_n+=sizeof(long)-(_n&(sizeof(long)-1));\
atomicIncr(used_memory,__n);\
}while(0)
#defineupdate_zmalloc_stat_free(__n)do{\
size_t_n=(__n);\
if(_n&(sizeof(long)-1))_n+=sizeof(long)-(_n&(sizeof(long)-1));\
atomicDecr(used_memory,__n);\
}while(0)
//輸出錯誤並列印錯誤資訊
staticvoidzmalloc_default_oom(size_tsize){
fprintf(stderr,"zmalloc:Outofmemorytryingtoallocate%zubytes\n",
size);
fflush(stderr);
abort();
}
staticvoid(*zmalloc_oom_handler)(size_t)=zmalloc_default_oom;
void*zmalloc(size_tsize){
void*ptr=malloc(size+PREFIX_SIZE);//多分配一個PREFIX_SIZE用來儲存Size
if(!ptr)zmalloc_oom_handler(size);
#ifdefHAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(zmalloc_size(ptr));
returnptr;
#else
*((size_t*)ptr)=size;
update_zmalloc_stat_alloc(size+PREFIX_SIZE);
return(char*)ptr+PREFIX_SIZE;
#endif
}
只根據巨集配置來呼叫相應的malloc,如果失敗,則將異常函式丟擲。在update_zmalloc_stat_alloc中的巨集,通過對8位元組對齊的判斷來實現對記憶體used_memory的精確度量。因為無論哪種malloc,本身都是會按照相應的位元組對齊來實現的。下面的atomicIncr其實就是多執行緒安全的增加記憶體數量的控制。
再看一下記憶體的塊控制和重新調整:
void*zcalloc(size_tsize){
//仍然是呼叫calloc
void*ptr=calloc(1,size+PREFIX_SIZE);
if(!ptr)zmalloc_oom_handler(size);
#ifdefHAVE_MALLOC_SIZE
update_zmalloc_stat_alloc(zmalloc_size(ptr));
returnptr;
#else
*((size_t*)ptr)=size;
update_zmalloc_stat_alloc(size+PREFIX_SIZE);
return(char*)ptr+PREFIX_SIZE;
#endif
}
//這個也是呼叫malloc來實現,和普通的realloc基本相同
void*zrealloc(void*ptr,size_tsize){
#ifndefHAVE_MALLOC_SIZE
void*realptr;
#endif
size_toldsize;
void*newptr;
if(size==0&&ptr!=NULL){
zfree(ptr);
returnNULL;
}
if(ptr==NULL)returnzmalloc(size);
#ifdefHAVE_MALLOC_SIZE
oldsize=zmalloc_size(ptr);
newptr=realloc(ptr,size);
if(!newptr)zmalloc_oom_handler(size);
update_zmalloc_stat_free(oldsize);
update_zmalloc_stat_alloc(zmalloc_size(newptr));
returnnewptr;
#else
realptr=(char*)ptr-PREFIX_SIZE;
oldsize=*((size_t*)realptr);
newptr=realloc(realptr,size+PREFIX_SIZE);
if(!newptr)zmalloc_oom_handler(size);
*((size_t*)newptr)=size;
update_zmalloc_stat_free(oldsize+PREFIX_SIZE);
update_zmalloc_stat_alloc(size+PREFIX_SIZE);
return(char*)newptr+PREFIX_SIZE;
#endif
}
再看一下記憶體的釋放:
voidzfree(void*ptr){
#ifndefHAVE_MALLOC_SIZE
void*realptr;
size_toldsize;
#endif
if(ptr==NULL)return;
#ifdefHAVE_MALLOC_SIZE
update_zmalloc_stat_free(zmalloc_size(ptr));
free(ptr);
#else
realptr=(char*)ptr-PREFIX_SIZE;
oldsize=*((size_t*)realptr);
update_zmalloc_stat_free(oldsize+PREFIX_SIZE);
free(realptr);
#endif
}
釋放函式和C的基本相同,就是把相關的記憶體回收到堆中去。其中需要注意的是拿到記憶體的真正的起始位置然後再釋放,不要弄錯了。這一段的難點在於,如果對C語言的巨集不清楚的話,一系列的預定義巨集和編譯巨集,還有巨集函式,往往會讓你眼花繚亂。
其它的函式,包括獲得當前程序中的方法,其實就是對Linux下的檔案的操作,這個如果熟悉在LINUX上檢視記憶體等資源的使用的就非常明白了。
2、記憶體的大小管理
在前面反覆提到過記憶體大小控制,看一下定義:
staticsize_tused_memory=0;
pthread_mutex_tused_memory_mutex=PTHREAD_MUTEX_INITIALIZER;
size_tzmalloc_used_memory(void){
size_tum;
atomicGet(used_memory,um);
returnum;
}
通過函式可以得到的used_memory,而這個函式會在記憶體需要釋放時呼叫檢查:
intfreeMemoryIfNeeded(void){
intkeys_freed=0;
.....
/*Finallyremovetheselectedkey.*/
if(bestkey){
......//下面兩處
delta=(longlong)zmalloc_used_memory();
......
delta-=(longlong)zmalloc_used_memory();
......
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
returnC_OK;
cant_free:
while(bioPendingJobsOfType(BIO_LAZY_FREE)){
//下面一處
if(((mem_reported-zmalloc_used_memory())+mem_freed)>=mem_tofree)
break;
usleep(1000);
}
returnC_ERR;
}
當記憶體不足時,需要呼叫策略進行記憶體的回收。記憶體回收很簡單,一個是刪除到期時間物件,另外一個當used_memory到達觸發點後,進行強制刪除物件。
3、記憶體回收策略LRU和LFU
記憶體的過期刪除有主動方式、被動方式和從庫過期指令,另外就是記憶體超限的策略,前面幾個都好理解,這裡重點看最後的策略:
先看一下資料結構:
structredisServer{
/*General*/
pid_tpid;/*Mainprocesspid.*/
char*configfile;/*Absoluteconfigfilepath,orNULL*/
...
_Atomicunsignedintlruclock;/*ClockforLRUeviction*/
...
}
typedefstructredisObject{
unsignedtype:4;
unsignedencoding:4;
unsignedlru:LRU_BITS;/*LRUtime(relativetogloballru_clock)or
*LFUdata(leastsignificant8bitsfrequency
*andmostsignificant16bitsaccesstime).*/
intrefcount;
void*ptr;
}robj;
LRU其實就是OS中的最少最近使用的方法,如果好好看過作業系統相關的排程,應該知道它是怎麼回事兒。他的缺點在於如果一個KEY剛剛使用過,可能以後都不再用了,那麼就會有一些問題,而LFU就是增加了一個最近使用頻率,其實就是排除了這種情況。
intfreeMemoryIfNeeded(void){
intkeys_freed=0;
/*Bydefaultreplicasshouldignoremaxmemory
*andjustbemastersexactcopies.*/
if(server.masterhost&&server.repl_slave_ignore_maxmemory)returnC_OK;
size_tmem_reported,mem_tofree,mem_freed;
mstime_tlatency,eviction_latency;
longlongdelta;
intslaves=listLength(server.slaves);
/*Whenclientsarepausedthedatasetshouldbestaticnotjustfromthe
*POVofclientsnotbeingabletowrite,butalsofromthePOVof
*expiresandevictionsofkeysnotbeingperformed.*/
if(clientsArePaused())returnC_OK;
if(getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL)==C_OK)
returnC_OK;
mem_freed=0;
if(server.maxmemory_policy==MAXMEMORY_NO_EVICTION)
gotocant_free;/*Weneedtofreememory,butpolicyforbids.*/
latencyStartMonitor(latency);
//迴圈判斷是否滿足記憶體的需要,直到達到為止
while(mem_freed<mem_tofree){
intj,k,i;
staticunsignedintnext_db=0;
sdsbestkey=NULL;
intbestdbid;
redisDb*db;
dict*dict;
dictEntry*de;
//不同策略的過期池
if(server.maxmemory_policy&(MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU)||
server.maxmemory_policy==MAXMEMORY_VOLATILE_TTL)
{
structevictionPoolEntry*pool=EvictionPoolLRU;
while(bestkey==NULL){
unsignedlongtotal_keys=0,keys;
/*Wedon'twanttomakelocal-dbchoiceswhenexpiringkeys,
*sotostartpopulatetheevictionpoolsamplingkeysfrom
*everyDB.*/
//迴圈每個資料庫,根據策略查詢KEY並放入POOL中
for(i=0;i<server.dbnum;i++){
db=server.db+i;
dict=(server.maxmemory_policy&MAXMEMORY_FLAG_ALLKEYS)?
db->dict:db->expires;
if((keys=dictSize(dict))!=0){
evictionPoolPopulate(i,dict,db->dict,pool);
total_keys+=keys;
}
}
if(!total_keys)break;/*Nokeystoevict.*/
/*Gobackwardfrombesttoworstelementtoevict.*/
//後臺刪除
for(k=EVPOOL_SIZE-1;k>=0;k--){
if(pool[k].key==NULL)continue;
bestdbid=pool[k].dbid;
if(server.maxmemory_policy&MAXMEMORY_FLAG_ALLKEYS){
de=dictFind(server.db[pool[k].dbid].dict,
pool[k].key);
}else{
de=dictFind(server.db[pool[k].dbid].expires,
pool[k].key);
}
/*Removetheentryfromthepool.*/
if(pool[k].key!=pool[k].cached)
sdsfree(pool[k].key);
pool[k].key=NULL;
pool[k].idle=0;
/*Ifthekeyexists,isourpick.Otherwiseitis
*aghostandweneedtotrythenextelement.*/
if(de){
bestkey=dictGetKey(de);
break;
}else{
/*Ghost...Iterateagain.*/
}
}
}
}
/*volatile-randomandallkeys-randompolicy*/
//隨機策略忽略
elseif(server.maxmemory_policy==MAXMEMORY_ALLKEYS_RANDOM||
server.maxmemory_policy==MAXMEMORY_VOLATILE_RANDOM)
{
/*Whenevictingarandomkey,wetrytoevictakeyfor
*eachDB,soweusethestatic'next_db'variableto
*incrementallyvisitallDBs.*/
for(i=0;i<server.dbnum;i++){
j=(++next_db)%server.dbnum;
db=server.db+j;
dict=(server.maxmemory_policy==MAXMEMORY_ALLKEYS_RANDOM)?
db->dict:db->expires;
if(dictSize(dict)!=0){
de=dictGetRandomKey(dict);
bestkey=dictGetKey(de);
bestdbid=j;
break;
}
}
}
/*Finallyremovetheselectedkey.*/
if(bestkey){
db=server.db+bestdbid;
robj*keyobj=createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj,server.lazyfree_lazy_eviction);
/*Wecomputetheamountofmemoryfreedbydb*Delete()alone.
*Itispossiblethatactuallythememoryneededtopropagate
*theDELinAOFandreplicationlinkisgreaterthantheone
*wearefreeingremovingthekey,butwecan'taccountfor
*thatotherwisewewouldneverexittheloop.
*
*AOFandOutputbuffermemorywillbefreedeventuallyso
*weonlycareaboutmemoryusedbythekeyspace.*/
delta=(longlong)zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
if(server.lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
latencyRemoveNestedEvent(latency,eviction_latency);
delta-=(longlong)zmalloc_used_memory();
mem_freed+=delta;
server.stat_evictedkeys++;
notifyKeyspaceEvent(NOTIFY_EVICTED,"evicted",
keyobj,db->id);
decrRefCount(keyobj);
keys_freed++;
/*Whenthememorytofreestartstobebigenough,wemay
*startspendingsomuchtimeherethatisimpossibleto
*deliverdatatotheslavesfastenough,soweforcethe
*transmissionhereinsidetheloop.*/
if(slaves)flushSlavesOutputBuffers();
/*Normallyourstopconditionistheabilitytorelease
*afixed,pre-computedamountofmemory.Howeverwhenwe
*aredeletingobjectsinanotherthread,it'sbetterto
*check,fromtimetotime,ifwealreadyreachedourtarget
*memory,sincethe"mem_freed"amountiscomputedonly
*acrossthedbAsyncDelete()call,whilethethreadcan
*releasethememoryallthetime.*/
if(server.lazyfree_lazy_eviction&&!(keys_freed%16)){
if(getMaxmemoryState(NULL,NULL,NULL,NULL)==C_OK){
/*Let'ssatisfyourstopcondition.*/
mem_freed=mem_tofree;
}
}
}else{
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
gotocant_free;/*nothingtofree...*/
}
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("eviction-cycle",latency);
returnC_OK;
cant_free:
/*Wearehereifwearenotabletoreclaimmemory.Thereisonlyone
*lastthingwecantry:checkifthelazyfreethreadhasjobsinqueue
*andwait...*/
while(bioPendingJobsOfType(BIO_LAZY_FREE)){
if(((mem_reported-zmalloc_used_memory())+mem_freed)>=mem_tofree)
break;
usleep(1000);
}
returnC_ERR;
}
intfreeMemoryIfNeededAndSafe(void){
if(server.lua_timedout||server.loading)returnC_OK;
returnfreeMemoryIfNeeded();
}
在Redis中有一個升序的過期池,注意,它只是應用於非隨機的策略中。
#defineEVPOOL_SIZE16
#defineEVPOOL_CACHED_SDS_SIZE255
structevictionPoolEntry{
unsignedlonglongidle;/*Objectidletime(inversefrequencyforLFU)*/
sdskey;/*Keyname.*/
sdscached;/*CachedSDSobjectforkeyname.*/
intdbid;/*KeyDBnumber.*/
};
staticstructevictionPoolEntry*EvictionPoolLRU;
/*ReturntheLRUclock,basedontheclockresolution.Thisisatime
*inareduced-bitsformatthatcanbeusedtosetandcheckthe
*object->lrufieldofredisObjectstructures.*/
unsignedintgetLRUClock(void){
return(mstime()/LRU_CLOCK_RESOLUTION)&LRU_CLOCK_MAX;
}
/*ThisfunctionisusedtoobtainthecurrentLRUclock.
*Ifthecurrentresolutionislowerthanthefrequencywerefreshthe
*LRUclock(asitshouldbeinproductionservers)wereturnthe
*precomputedvalue,otherwiseweneedtoresorttoasystemcall.*/
//LRU時鐘計算的方法
unsignedintLRU_CLOCK(void){
unsignedintlruclock;
if(1000/server.hz<=LRU_CLOCK_RESOLUTION){
lruclock=server.lruclock;
}else{
lruclock=getLRUClock();
}
returnlruclock;
}
/*Givenanobjectreturnstheminnumberofmillisecondstheobjectwasnever
*requested,usinganapproximatedLRUalgorithm.*/
//近似的LRU估算演算法
unsignedlonglongestimateObjectIdleTime(robj*o){
unsignedlonglonglruclock=LRU_CLOCK();
if(lruclock>=o->lru){
return(lruclock-o->lru)*LRU_CLOCK_RESOLUTION;
}else{
return(lruclock+(LRU_CLOCK_MAX-o->lru))*
LRU_CLOCK_RESOLUTION;
}
}
//從sampledict隨機挑選物件並計算LRU,以升序插入POOL中
voidevictionPoolPopulate(intdbid,dict*sampledict,dict*keydict,structevictionPoolEntry*pool){
intj,k,count;
dictEntry*samples[server.maxmemory_samples];
count=dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
for(j=0;j<count;j++){
unsignedlonglongidle;
sdskey;
robj*o;
dictEntry*de;
de=samples[j];
key=dictGetKey(de);
/*Ifthedictionarywearesamplingfromisnotthemain
*dictionary(buttheexpiresone)weneedtolookupthekey
*againinthekeydictionarytoobtainthevalueobject.*/
if(server.maxmemory_policy!=MAXMEMORY_VOLATILE_TTL){
if(sampledict!=keydict)de=dictFind(keydict,key);
o=dictGetVal(de);
}
/*Calculatetheidletimeaccordingtothepolicy.Thisiscalled
*idlejustbecausethecodeinitiallyhandledLRU,butisinfact
*justascorewhereanhigherscoremeansbettercandidate.*/
if(server.maxmemory_policy&MAXMEMORY_FLAG_LRU){
idle=estimateObjectIdleTime(o);
}elseif(server.maxmemory_policy&MAXMEMORY_FLAG_LFU){
/*WhenweuseanLRUpolicy,wesortthekeysbyidletime
*sothatweexpirekeysstartingfromgreateridletime.
*HoweverwhenthepolicyisanLFUone,wehaveafrequency
*estimation,andwewanttoevictkeyswithlowerfrequency
*first.Soinsidethepoolweputobjectsusingtheinverted
*frequencysubtractingtheactualfrequencytothemaximum
*frequencyof255.*/
idle=255-LFUDecrAndReturn(o);
}elseif(server.maxmemory_policy==MAXMEMORY_VOLATILE_TTL){
/*Inthiscasethesoonertheexpirethebetter.*/
idle=ULLONG_MAX-(long)dictGetVal(de);
}else{
serverPanic("UnknownevictionpolicyinevictionPoolPopulate()");
}
/*Inserttheelementinsidethepool.
*First,findthefirstemptybucketorthefirstpopulated
*bucketthathasanidletimesmallerthanouridletime.*/
k=0;
while(k<EVPOOL_SIZE&&
pool[k].key&&
pool[k].idle<idle)k++;
if(k==0&&pool[EVPOOL_SIZE-1].key!=NULL){
/*Can'tinsertiftheelementis<theworstelementwehave
*andtherearenoemptybuckets.*/
continue;
}elseif(k<EVPOOL_SIZE&&pool[k].key==NULL){
/*Insertingintoemptyposition.Nosetupneededbeforeinsert.*/
}else{
/*Insertinginthemiddle.Nowkpointstothefirstelement
*greaterthantheelementtoinsert.*/
if(pool[EVPOOL_SIZE-1].key==NULL){
/*Freespaceontheright?Insertatkshifting
*alltheelementsfromktoendtotheright.*/
/*SaveSDSbeforeoverwriting.*/
sdscached=pool[EVPOOL_SIZE-1].cached;
memmove(pool+k+1,pool+k,
sizeof(pool[0])*(EVPOOL_SIZE-k-1));
pool[k].cached=cached;
}else{
/*Nofreespaceonright?Insertatk-1*/
k--;
/*Shiftallelementsontheleftofk(included)tothe
*left,sowediscardtheelementwithsmalleridletime.*/
sdscached=pool[0].cached;/*SaveSDSbeforeoverwriting.*/
if(pool[0].key!=pool[0].cached)sdsfree(pool[0].key);
memmove(pool,pool+1,sizeof(pool[0])*k);
pool[k].cached=cached;
}
}
/*TrytoreusethecachedSDSstringallocatedinthepoolentry,
*becauseallocatinganddeallocatingthisobjectiscostly
*(accordingtotheprofiler,notmyfantasy.Remember:
*prematureoptimizblablablabla.*/
intklen=sdslen(key);
if(klen>EVPOOL_CACHED_SDS_SIZE){
pool[k].key=sdsdup(key);
}else{
memcpy(pool[k].cached,key,klen+1);
sdssetlen(pool[k].cached,klen);
pool[k].key=pool[k].cached;
}
pool[k].idle=idle;
pool[k].dbid=dbid;
}
}
之所以不再使用LRU演算法,主要原因是雜湊+雙向連結串列太耗費空間,不如使用近似的LRU演算法。它在KEY上增加了一個時間戳,通過隨機取樣中配置的數量,然後將其刪除,如果達到目的即停止否則再次進行。
再看一下LFU:
unsignedlongLFUGetTimeInMinutes(void){
return(server.unixtime/60)&65535;
}
/*Givenanobjectlastaccesstime,computetheminimumnumberofminutes
*thatelapsedsincethelastaccess.Handleoverflow(ldtgreaterthan
*thecurrent16bitsminutestime)consideringthetimeaswrapping
*exactlyonce.*/
unsignedlongLFUTimeElapsed(unsignedlongldt){
unsignedlongnow=LFUGetTimeInMinutes();
if(now>=ldt)returnnow-ldt;
return65535-ldt+now;
}
/*Logarithmicallyincrementacounter.Thegreateristhecurrentcountervalue
*thelesslikelyisthatitgetsreallyimplemented.Saturateitat255.*/
uint8_tLFULogIncr(uint8_tcounter){
if(counter==255)return255;
doubler=(double)rand()/RAND_MAX;
doublebaseval=counter-LFU_INIT_VAL;
if(baseval<0)baseval=0;
doublep=1.0/(baseval*server.lfu_log_factor+1);
if(r<p)counter++;
returncounter;
}
/*IftheobjectdecrementtimeisreacheddecrementtheLFUcounterbut
*donotupdateLFUfieldsoftheobject,weupdatetheaccesstime
*andcounterinanexplicitwaywhentheobjectisreallyaccessed.
*Andwewilltimeshalvethecounteraccordingtothetimesof
*elapsedtimethanserver.lfu_decay_time.
*Returntheobjectfrequencycounter.
*
*Thisfunctionisusedinordertoscanthedatasetforthebestobject
*tofit:aswecheckforthecandidate,weincrementallydecrementthe
*counterofthescannedobjectsifneeded.*/
unsignedlongLFUDecrAndReturn(robj*o){
unsignedlongldt=o->lru>>8;
unsignedlongcounter=o->lru&255;
unsignedlongnum_periods=server.lfu_decay_time?LFUTimeElapsed(ldt)/server.lfu_decay_time:0;
if(num_periods)
counter=(num_periods>counter)?0:counter-num_periods;
returncounter;
}
LFU劃分key物件的內部時鐘的24位為兩塊,前16位表示時鐘,後8位表示一個計數器。16位以小時為單位。後8位表示當前key值的讀寫頻率,8位最高是255,可redis並沒有採用線性上升的方式,而是通過配置引數調整一個複雜的公式動態控制資料的增減速度。
lfu-log-factor 可以調整計數器counter的增長速度,lfu-log-factor越大,counter增長的越慢。
lfu-decay-time 是一個以分鐘為單位的數值,可以調整counter的減少速度。
同時為了保證新生KEY不被快速刪除,預設值提高到5.
4、記憶體的操作
這個底層的記憶體控制其實是應用到上層的所有的資料結構,看下面的list建立:
list*listCreate(void)
{
structlist*list;
if((list=zmalloc(sizeof(*list)))==NULL)
returnNULL;
list->head=list->tail=NULL;
list->len=0;
list->dup=NULL;
list->free=NULL;
list->match=NULL;
returnlist;
}
換句話說,所有的資料結構的操作都是在這個基礎之上進行的。看一下前面幾個資料結構的分析就明白了。不過正如前面所言,REDIS的操作,是按照固定的大小來分配的,也就是說,如果你申請8個位元組,會給你8個位元組,但申請12個會給你16個位元組。
同樣,修改也可能造成記憶體空間的增加或者減少,比如原來KV中Value是“ISOK”,可能修改成“ISFALSE”或者“NIL”。這樣,記憶體碎片就出現了,由此可以明白刪除也會出現這種問題。那解決這個問題,就得需要空間的轉換了,其實就是把附近的記憶體碎片通過不斷的遷移,形成一個大塊的可用記憶體。
可能很快就想到了,移動記憶體又佔用CPU又佔用一部分記憶體,所以需要有條件的進行:
active-defrag-ignore-bytes 100mb:碎片到100MB時,啟動清理。
active-defrag-threshold-lower 10:當碎片超過 10% 時,啟動清理。
active-defrag-threshold-upper 100:記憶體碎片超過 100%,最大清理。
只有CPU滿足下列條件才會工作:
active-defrag-cycle-min 5:佔用 CPU 時間的比例不低於此值。
active-defrag-cycle-max 75:佔用 CPU 時間的比例不高於此值。超過後自動停止,防止影響使用。
voidcomputeDefragCycles(){
size_tfrag_bytes;
floatfrag_pct=getAllocatorFragmentation(&frag_bytes);
/*Ifwe'renotalreadyrunning,andbelowthethreshold,exit.*/
if(!server.active_defrag_running){
if(frag_pct<server.active_defrag_threshold_lower||frag_bytes<server.active_defrag_ignore_bytes)
return;
}
//看到下面的相關變量了吧
/*Calculatetheadaptiveaggressivenessofthedefrag*/
//CPU計算,這個可以用在自己的程式碼中
intcpu_pct=INTERPOLATE(frag_pct,
server.active_defrag_threshold_lower,
server.active_defrag_threshold_upper,
server.active_defrag_cycle_min,
server.active_defrag_cycle_max);
cpu_pct=LIMIT(cpu_pct,
server.active_defrag_cycle_min,
server.active_defrag_cycle_max);
/*Weallowincreasingtheaggressivenessduringascan,butdon't
*reduceit.*/
if(!server.active_defrag_running||
cpu_pct>server.active_defrag_running)
{
server.active_defrag_running=cpu_pct;
serverLog(LL_VERBOSE,
"Startingactivedefrag,frag=%.0f%%,frag_bytes=%zu,cpu=%d%%",
frag_pct,frag_bytes,cpu_pct);
}
}
voidactiveDefragCycle(void){
staticintcurrent_db=-1;
staticunsignedlongcursor=0;
staticredisDb*db=NULL;
staticlonglongstart_scan,start_stat;
unsignedintiterations=0;
unsignedlonglongprev_defragged=server.stat_active_defrag_hits;
unsignedlonglongprev_scanned=server.stat_active_defrag_scanned;
longlongstart,timelimit,endtime;
mstime_tlatency;
intquit=0;
if(!server.active_defrag_enabled){
if(server.active_defrag_running){
/*ifactivedefragwasdisabledmid-run,startfromfreshnexttime.*/
server.active_defrag_running=0;
if(db)
listEmpty(db->defrag_later);
defrag_later_current_key=NULL;
defrag_later_cursor=0;
current_db=-1;
cursor=0;
db=NULL;
}
return;
}
if(hasActiveChildProcess())
return;/*Defraggingmemorywhilethere'saforkwilljustdodamage.*/
/*Onceasecond,checkifwethefragmentationjustfiesstartingascan
*ormakingitmoreaggressive.*/
run_with_period(1000){
computeDefragCycles();
}
if(!server.active_defrag_running)
return;
/*SeeactiveExpireCycleforhowtimelimitishandled.*/
start=ustime();
timelimit=1000000*server.active_defrag_running/server.hz/100;
if(timelimit<=0)timelimit=1;
endtime=start+timelimit;
latencyStartMonitor(latency);
do{
/*ifwe'renotcontinuingascanfromthelastcallorloop,startanewone*/
if(!cursor){
/*finishanyleftoversfrompreviousdbbeforemovingtothenextone*/
if(db&&defragLaterStep(db,endtime)){
quit=1;/*timeisup,wedidn'tfinishallthework*/
break;/*thiswillexitthefunctionandwe'llcontinueonthenextcycle*/
}
/*Moveontonextdatabase,andstopifwereachedthelastone.*/
if(++current_db>=server.dbnum){
/*defragotheritemsnotpartofthedb/keys*/
defragOtherGlobals();
longlongnow=ustime();
size_tfrag_bytes;
floatfrag_pct=getAllocatorFragmentation(&frag_bytes);
serverLog(LL_VERBOSE,
"Activedefragdonein%dms,reallocated=%d,frag=%.0f%%,frag_bytes=%zu",
(int)((now-start_scan)/1000),(int)(server.stat_active_defrag_hits-start_stat),frag_pct,frag_bytes);
start_scan=now;
current_db=-1;
cursor=0;
db=NULL;
server.active_defrag_running=0;
computeDefragCycles();/*ifanotherscanisneeded,startitrightaway*/
if(server.active_defrag_running!=0&&ustime()<endtime)
continue;
break;
}
elseif(current_db==0){
/*Startascanfromthefirstdatabase.*/
start_scan=ustime();
start_stat=server.stat_active_defrag_hits;
}
db=&server.db[current_db];
cursor=0;
}
do{
/*beforescanningthenextbucket,seeifwehavebigkeysleftfromthepreviousbuckettoscan*/
if(defragLaterStep(db,endtime)){
quit=1;/*timeisup,wedidn'tfinishallthework*/
break;/*thiswillexitthefunctionandwe'llcontinueonthenextcycle*/
}
cursor=dictScan(db->dict,cursor,defragScanCallback,defragDictBucketCallback,db);
/*Oncein16scaniterations,512pointerreallocations.or64keys
*(ifwehavealotofpointersinonehashbucketorrehasing),
*checkifwereachedthetimelimit.
*Butregardless,don'tstartanewdbinthisloop,thisisbecauseafter
*thelastdbwecalldefragOtherGlobals,whichmustbedoneinoncecycle*/
if(!cursor||(++iterations>16||
server.stat_active_defrag_hits-prev_defragged>512||
server.stat_active_defrag_scanned-prev_scanned>64)){
if(!cursor||ustime()>endtime){
quit=1;
break;
}
iterations=0;
prev_defragged=server.stat_active_defrag_hits;
prev_scanned=server.stat_active_defrag_scanned;
}
}while(cursor&&!quit);
}while(!quit);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("active-defrag-cycle",latency);
}
這個的操作仍然是在serverCron函式中呼叫databasesCron來呼叫這個清理函式。
5、記憶體的溢位管理
在Redis中記憶體溢位其實和上面的記憶體回收有相當大的關係,在記憶體到達上限後Redis的處理方法有幾種可選:
noeviction:預設策略,不刪除任何資料,但是會拒絕服務並返回錯誤。(error)OOM command not allowed when used memory,出現這種情況Redis只可進行讀操作。
volatile-lru:由LRU演算法刪除設定了超時屬性(expire)的鍵,一直到空間滿足需求為止。假如沒有可刪除物件,則轉到noeviction策略。
allkeys-lru:由LRU演算法刪除鍵,刪除到空間滿足。
allkeys-random:隨機刪除所有鍵,到空間滿足。
volatile-random:隨機刪除過期鍵,到空間滿足。
volatile-ttl:根據鍵值物件的ttl屬性,刪除最近將要過期資料。沒有則轉vnoeviction策略。
volatile-lfu:從配置過期時間的鍵中刪除使用頻率最少的鍵
allkeys-lfu:從鍵中刪除使用頻率最少的鍵
這個策略是可以通過config進行配置的。不過需要注意的是,如果設定了最大記憶體的引數,記憶體一旦溢位,同時設定為非noeviction 策略時,會頻繁的進行記憶體的回收,這會嚴重影響Redis的效能。
三、總結
記憶體管理一般在實際情況都是在原生的庫的基礎上匹配出一個記憶體池(有複雜的有簡單的,統一叫記憶體池),對記憶體進行動態的管理和控制,Redis看來明白借鑑大牛們的技術更方便和安全一些,所以直接根據實際情況取捨來呼叫不同的記憶體管理庫,這其實也是其高明之處,畢竟Redis是以消耗記憶體為主的資料庫。“他山之石,可以攻玉”,做好自己的工作,把優點亮化,這也是Redis成功的一個原因吧,這可以做為國內開發者的一個借鑑。