1. 程式人生 > 其它 >@data 重寫set方法_Go 專案實戰:實現一個 Redis(4) 之AOF 持久化與AOF重寫

@data 重寫set方法_Go 專案實戰:實現一個 Redis(4) 之AOF 持久化與AOF重寫

技術標籤:@data 重寫set方法

點選上方藍色“ Go語言中文網 ”關注, 每天一起學 Go

本文是使用 golang 實現 redis 系列的第四篇文章,將介紹如何使用 golang 實現 Append Only File 持久化及 AOF 檔案重寫。

本文完整原始碼在作者Github:https://github.com/HDT3213/godis/blob/master/src/db/aof.go

AOF 檔案

AOF 持久化是典型的非同步任務,主協程(goroutine) 可以使用 channel 將資料傳送到非同步協程由非同步協程執行持久化操作。

在 DB 中定義相關欄位:

typeDBstruct{
//主執行緒使用此channel將要持久化的命令傳送到非同步協程
aofChanchan*reply.MultiBulkReply
//appendfile檔案描述符
aofFile*os.File
//appendfile路徑
aofFilenamestring

//aof重寫需要的緩衝區,將在AOF重寫一節詳細介紹
aofRewriteChanchan*reply.MultiBulkReply
//在必要的時候使用此欄位暫停持久化操作
pausingAofsync.RWMutex
}

在進行持久化時需要注意兩個細節:

  1. get 之類的讀命令並不需要進行持久化
  2. expire 命令要用等效的 expireat 命令替換。舉例說明,10:00 執行 expire a 3600
    表示鍵 a 在 11:00 過期,在 10:30 載入AOF檔案時執行 expire a 3600 就成了 11:30 過期與原資料不符。

我們在命令處理方法中返回 AOF 需要的額外資訊:

typeextrastruct{
//表示該命令是否需要持久化
toPersistbool
//如上文所述expire之類的命令不能直接持久化
//若specialAof==nil則將命令原樣持久化,否則持久化specialAof中的指令
specialAof[]*reply.MultiBulkReply
}

typeCmdFuncfunc(db*DB,args[][]byte)(redis.Reply,*extra)

以 SET 命令為例:

funcSet(db*DB,args[][]byte)(redis.Reply,*extra){
//....
varresultint
switchpolicy{
caseupsertPolicy:
result=db.Put(key,entity)
caseinsertPolicy:
result=db.PutIfAbsent(key,entity)
caseupdatePolicy:
result=db.PutIfExists(key,entity)
}
extra:=&extra{toPersist:result>0}//若實際寫入了資料則toPresist=true,若因為XX或NX選項沒有實際寫入資料則toPresist=false
ifresult>0{
ifttl!=unlimitedTTL{//使用了EX或NX選項
expireTime:=time.Now().Add(time.Duration(ttl)*time.Millisecond)
db.Expire(key,expireTime)
//持久化時使用setkeyvalue和pexpireat命令代替setkeyvalueEXttl命令
extra.specialAof=[]*reply.MultiBulkReply{
reply.MakeMultiBulkReply([][]byte{
[]byte("SET"),
args[0],
args[1],
}),
makeExpireCmd(key,expireTime),
}
}else{
db.Persist(key)//overridettl
}
}
return&reply.OkReply{},extra
}

varpExpireAtCmd=[]byte("PEXPIREAT")

funcmakeExpireCmd(keystring,expireAttime.Time)*reply.MultiBulkReply{
args:=make([][]byte,3)
args[0]=pExpireAtCmd
args[1]=[]byte(key)
args[2]=[]byte(strconv.FormatInt(expireAt.UnixNano()/1e6,10))
returnreply.MakeMultiBulkReply(args)
}

在處理命令的排程方法中將 aof 命令傳送到 channel:

func(db*DB)Exec(credis.Client,args[][]byte)(resultredis.Reply){
//....
//normalcommands
varextra*extra
cmdFunc,ok:=router[cmd]//找到命令對應的處理函式
if!ok{
returnreply.MakeErrReply("ERRunknowncommand'"+cmd+"'")
}
//使用處理函式執行命令
iflen(args)>1{
result,extra=cmdFunc(db,args[1:])
}else{
result,extra=cmdFunc(db,[][]byte{})
}

//AOF持久化
ifconfig.Properties.AppendOnly{
ifextra!=nil&&extra.toPersist{
//寫入specialAof
ifextra.specialAof!=nil&&len(extra.specialAof)>0{
for_,r:=rangeextra.specialAof{
db.addAof(r)
}
}else{
//寫入原始命令
r:=reply.MakeMultiBulkReply(args)
db.addAof(r)
}
}
}
return
}

在非同步協程中寫入命令:

func(db*DB)handleAof(){
forcmd:=rangedb.aofChan{
//非同步協程在持久化之前會嘗試獲取鎖,若其他協程持有鎖則會暫停持久化操作
//鎖也保證了每次寫入完整的一條指令不會格式錯誤
db.pausingAof.RLock()
ifdb.aofRewriteChan!=nil{
db.aofRewriteChan}
_,err:=db.aofFile.Write(cmd.ToBytes())
iferr!=nil{
logger.Warn(err)
}
db.pausingAof.RUnlock()
}
}

讀取過程與協議解析器[1]一節基本相同,不在正文中贅述:loadAof https://github.com/HDT3213/godis/blob/master/src/db/aof.go。

AOF 重寫

若我們對鍵a賦值100次會在AOF檔案中產生100條指令但只有最後一條指令是有效的,為了減少持久化檔案的大小需要進行AOF重寫以刪除無用的指令。

重寫必須在固定不變的資料集上進行,不能直接使用記憶體中的資料。Redis 重寫的實現方式是進行 fork 並在子程序中遍歷資料庫內的資料重新生成AOF檔案。由於 golang 不支援 fork 操作,我們只能採用讀取AOF檔案生成副本的方式來代替fork。

在進行AOF重寫操作時需要滿足兩個要求:

  1. 若 AOF 重寫失敗或被中斷,AOF 檔案需保持重寫之前的狀態不能丟失資料
  2. 進行 AOF 重寫期間執行的命令必須儲存到新的AOF檔案中, 不能丟失

因此我們設計了一套比較複雜的流程:

  1. 暫停AOF寫入 -> 更改狀態為重寫中 -> 準備重寫 -> 恢復AOF寫入
  2. 在重寫過程中,持久化協程在將命令寫入檔案的同時也將其寫入記憶體中的重寫快取區
  3. 重寫協程讀取 AOF 檔案中的前一部分(重寫開始前的資料,不包括讀寫過程中寫入的資料)並重寫到臨時檔案(tmp.aof)中
  4. 暫停AOF寫入 -> 將重寫緩衝區中的命令寫入tmp.aof -> 使用臨時檔案tmp.aof覆蓋AOF檔案(使用檔案系統的mv命令保證安全)-> 清空重寫緩衝區 -> 恢復AOF寫入

在不阻塞線上服務的同時進行其它操作是一項必需的能力,AOF重寫的思路在解決這類問題時具有重要的參考價值。比如Mysql Online DDL: gh-ost[2]採用了類似的策略保證資料一致。

首先準備開始重寫操作:

func(db*DB)startRewrite()(*os.File,int64,error){
//暫停AOF寫入,資料會在db.aofChan中暫時堆積
db.pausingAof.Lock()
deferdb.pausingAof.Unlock()

//建立重寫緩衝區
db.aofRewriteChan=make(chan*reply.MultiBulkReply,aofQueueSize)

//讀取當前aof檔案大小,不讀取重寫過程中新寫入的內容
fileInfo,_:=os.Stat(db.aofFilename)
filesize:=fileInfo.Size()

//建立臨時檔案
file,err:=ioutil.TempFile("","aof")
iferr!=nil{
logger.Warn("tmpfilecreatefailed")
returnnil,0,err
}
returnfile,filesize,nil
}

在重寫過程中,持久化協程進行雙寫:

func(db*DB)handleAof(){
forcmd:=rangedb.aofChan{
db.pausingAof.RLock()
ifdb.aofRewriteChan!=nil{
//資料寫入重寫緩衝區
db.aofRewriteChan}
_,err:=db.aofFile.Write(cmd.ToBytes())
iferr!=nil{
logger.Warn(err)
}
db.pausingAof.RUnlock()
}
}

執行重寫:

func(db*DB)aofRewrite(){
file,fileSize,err:=db.startRewrite()
iferr!=nil{
logger.Warn(err)
return
}

//loadaoffile
tmpDB:=&DB{
Data:dict.MakeSimple(),
TTLMap:dict.MakeSimple(),
Locker:lock.Make(lockerSize),
interval:5*time.Second,

aofFilename:db.aofFilename,
}
//只讀取開始重寫前aof檔案的內容
tmpDB.loadAof(int(fileSize))

//rewriteaoffile
tmpDB.Data.ForEach(func(keystring,rawinterface{})bool{
varcmd*reply.MultiBulkReply
entity,_:=raw.(*DataEntity)
switchval:=entity.Data.(type){
case[]byte:
cmd=persistString(key,val)
case*List.LinkedList:
cmd=persistList(key,val)
case*set.Set:
cmd=persistSet(key,val)
casedict.Dict:
cmd=persistHash(key,val)
case*SortedSet.SortedSet:
cmd=persistZSet(key,val)

}
ifcmd!=nil{
_,_=file.Write(cmd.ToBytes())
}
returntrue
})
tmpDB.TTLMap.ForEach(func(keystring,rawinterface{})bool{
expireTime,_:=raw.(time.Time)
cmd:=makeExpireCmd(key,expireTime)
ifcmd!=nil{
_,_=file.Write(cmd.ToBytes())
}
returntrue
})

db.finishRewrite(file)
}

重寫完畢後寫入緩衝區中的資料並替換正式檔案:

func(db*DB)finishRewrite(tmpFile*os.File){
//暫停AOF寫入
db.pausingAof.Lock()
deferdb.pausingAof.Unlock()


//將重寫緩衝區內的資料寫入臨時檔案
//因為handleAof已被暫停,在遍歷期間aofRewriteChan中不會有新資料
loop:
for{
select{
casecmd:=_,err:=tmpFile.Write(cmd.ToBytes())
iferr!=nil{
logger.Warn(err)
}
default:
//只有channel為空時才會進入此分支
breakloop
}
}
//釋放重寫緩衝區
close(db.aofRewriteChan)
db.aofRewriteChan=nil

//使用臨時檔案代替aof檔案
_=db.aofFile.Close()
_=os.Rename(tmpFile.Name(),db.aofFilename)

//重新開啟檔案描述符以保證正常寫入
aofFile,err:=os.OpenFile(db.aofFilename,os.O_APPEND|os.O_CREATE|os.O_RDWR,0600)
iferr!=nil{
panic(err)
}
db.aofFile=aofFile
}

作者:finley

出處:https://www.cnblogs.com/Finley/p/12663636.html

版權:本作品採用「署名-非商業性使用-相同方式共享 4.0 國際[3]」許可協議進行許可。

參考資料

[1]

協議解析器: https://www.cnblogs.com/Finley/p/11923168.html

[2]

Mysql Online DDL: gh-ost: https://github.com/github/gh-ost

[3]

署名-非商業性使用-相同方式共享 4.0 國際: https://creativecommons.org/licenses/by-nc-sa/4.0/


推薦閱讀

  • Go 專案實戰:實現一個 Redis(3) 之實現記憶體資料庫

福利 我為大家整理了一份 從入門到進階的Go學習資料禮包 ,包含學習建議:入門看什麼,進階看什麼。 關注公眾號 「polarisxu」,回覆 ebook 獲取;還可以回覆「進群」,和數萬 Gopher 交流學習。

73f7fcac7b425315e1522124e49cf5b0.png