Golang 實現 Redis(4): AOF 持久化與AOF重寫
阿新 • • 發佈:2020-04-09
本文是使用 golang 實現 redis 系列的第四篇文章,將介紹如何使用 golang 實現 Append Only File 持久化及 AOF 檔案重寫。
本文完整原始碼在作者GithubHDT3213/godis
AOF 檔案
AOF 持久化是典型的非同步任務,主協程(goroutine) 可以使用 channel 將資料傳送到非同步協程由非同步協程執行持久化操作。
在 DB 中定義相關欄位:
type DB struct { // 主執行緒使用此channel將要持久化的命令傳送到非同步協程 aofChan chan *reply.MultiBulkReply // append file 檔案描述符 aofFile *os.File // append file 路徑 aofFilename string // aof 重寫需要的緩衝區,將在AOF重寫一節詳細介紹 aofRewriteChan chan *reply.MultiBulkReply // 在必要的時候使用此欄位暫停持久化操作 pausingAof sync.RWMutex }
在進行持久化時需要注意兩個細節:
- get 之類的讀命令並不需要進行持久化
- expire 命令要用等效的 expireat 命令替換。舉例說明,10:00 執行
expire a 3600
表示鍵 a 在 11:00 過期,在 10:30 載入AOF檔案時執行expire a 3600
就成了 11:30 過期與原資料不符。
我們在命令處理方法中返回 AOF 需要的額外資訊:
type extra struct { // 表示該命令是否需要持久化 toPersist bool // 如上文所述 expire 之類的命令不能直接持久化 // 若 specialAof == nil 則將命令原樣持久化,否則持久化 specialAof 中的指令 specialAof []*reply.MultiBulkReply } type CmdFunc func(db *DB, args [][]byte) (redis.Reply, *extra)
以 SET 命令為例:
func Set(db *DB, args [][]byte) (redis.Reply, *extra) { //.... var result int switch policy { case upsertPolicy: result = db.Put(key, entity) case insertPolicy: result = db.PutIfAbsent(key, entity) case updatePolicy: result = db.PutIfExists(key, entity) } extra := &extra{toPersist: result > 0} // 若實際寫入了資料則toPresist=true, 若因為XX或NX選項沒有實際寫入資料則toPresist=false if result > 0 { if ttl != unlimitedTTL { // 使用了 EX 或 NX 選項 expireTime := time.Now().Add(time.Duration(ttl) * time.Millisecond) db.Expire(key, expireTime) // 持久化時使用 set key value 和 pexpireat 命令代替 set key value EX ttl 命令 extra.specialAof = []*reply.MultiBulkReply{ reply.MakeMultiBulkReply([][]byte{ []byte("SET"), args[0], args[1], }), makeExpireCmd(key, expireTime), } } else { db.Persist(key) // override ttl } } return &reply.OkReply{}, extra } var pExpireAtCmd = []byte("PEXPIREAT") func makeExpireCmd(key string, expireAt time.Time) *reply.MultiBulkReply { args := make([][]byte, 3) args[0] = pExpireAtCmd args[1] = []byte(key) args[2] = []byte(strconv.FormatInt(expireAt.UnixNano()/1e6, 10)) return reply.MakeMultiBulkReply(args) }
在處理命令的排程方法中將 aof 命令傳送到 channel:
func (db *DB) Exec(c redis.Client, args [][]byte) (result redis.Reply) {
// ....
// normal commands
var extra *extra
cmdFunc, ok := router[cmd] // 找到命令對應的處理函式
if !ok {
return reply.MakeErrReply("ERR unknown command '" + cmd + "'")
}
// 使用處理函式執行命令
if len(args) > 1 {
result, extra = cmdFunc(db, args[1:])
} else {
result, extra = cmdFunc(db, [][]byte{})
}
// AOF 持久化
if config.Properties.AppendOnly {
if extra != nil && extra.toPersist {
// 寫入 specialAof
if extra.specialAof != nil && len(extra.specialAof) > 0 {
for _, r := range extra.specialAof {
db.addAof(r)
}
} else {
// 寫入原始命令
r := reply.MakeMultiBulkReply(args)
db.addAof(r)
}
}
}
return
}
在非同步協程中寫入命令:
func (db *DB) handleAof() {
for cmd := range db.aofChan {
// 非同步協程在持久化之前會嘗試獲取鎖,若其他協程持有鎖則會暫停持久化操作
// 鎖也保證了每次寫入完整的一條指令不會格式錯誤
db.pausingAof.RLock()
if db.aofRewriteChan != nil {
db.aofRewriteChan <- cmd
}
_, err := db.aofFile.Write(cmd.ToBytes())
if err != nil {
logger.Warn(err)
}
db.pausingAof.RUnlock()
}
}
讀取過程與協議解析器一節基本相同,不在正文中贅述:loadAof。
AOF 重寫
若我們對鍵a賦值100次會在AOF檔案中產生100條指令但只有最後一條指令是有效的,為了減少持久化檔案的大小需要進行AOF重寫以刪除無用的指令。
重寫必須在固定不變的資料集上進行,不能直接使用記憶體中的資料。Redis 重寫的實現方式是進行 fork 並在子程序中遍歷資料庫內的資料重新生成AOF檔案。由於 golang 不支援 fork 操作,我們只能採用讀取AOF檔案生成副本的方式來代替fork。
在進行AOF重寫操作時需要滿足兩個要求:
- 若 AOF 重寫失敗或被中斷,AOF 檔案需保持重寫之前的狀態不能丟失資料
- 進行 AOF 重寫期間執行的命令必須儲存到新的AOF檔案中, 不能丟失
因此我們設計了一套比較複雜的流程:
- 暫停AOF寫入 -> 更改狀態為重寫中 -> 複製當前AOF檔案 -> 恢復AOF寫入
- 在重寫過程中,持久化協程在將命令寫入檔案的同時也將其寫入記憶體中的重寫快取區
- 重寫協程讀取AOF副本並將重寫到臨時檔案(tmp.aof)中
- 暫停AOF寫入 -> 將重寫緩衝區中的命令寫入tmp.aof -> 使用臨時檔案tmp.aof覆蓋AOF檔案(使用檔案系統的mv命令保證安全)-> 清空重寫緩衝區 -> 恢復AOF寫入
在不阻塞線上服務的同時進行其它操作是一項必需的能力,AOF重寫的思路在解決這類問題時具有重要的參考價值。比如Mysql Online DDL: gh-ost採用了類似的策略保證資料一致。
首先準備開始重寫操作:
func (db *DB) startRewrite() (*os.File, error) {
// 暫停AOF寫入, 資料會在 db.aofChan 中暫時堆積
db.pausingAof.Lock()
defer db.pausingAof.Unlock()
// 建立重寫緩衝區
db.aofRewriteChan = make(chan *reply.MultiBulkReply, aofQueueSize)
// 建立臨時檔案
file, err := ioutil.TempFile("", "aof")
if err != nil {
logger.Warn("tmp file create failed")
return nil, err
}
return file, nil
}
在重寫過程中,持久化協程進行雙寫:
func (db *DB) handleAof() {
for cmd := range db.aofChan {
db.pausingAof.RLock()
if db.aofRewriteChan != nil {
// 資料寫入重寫緩衝區
db.aofRewriteChan <- cmd
}
_, err := db.aofFile.Write(cmd.ToBytes())
if err != nil {
logger.Warn(err)
}
db.pausingAof.RUnlock()
}
}
執行重寫:
func (db *DB) aofRewrite() {
file, err := db.startRewrite()
if err != nil {
logger.Warn(err)
return
}
// load aof file
tmpDB := &DB{
Data: dict.MakeSimple(),
TTLMap: dict.MakeSimple(),
Locker: lock.Make(lockerSize),
interval: 5 * time.Second,
aofFilename: db.aofFilename,
}
tmpDB.loadAof()
// rewrite aof file
tmpDB.Data.ForEach(func(key string, raw interface{}) bool {
var cmd *reply.MultiBulkReply
entity, _ := raw.(*DataEntity)
switch val := entity.Data.(type) {
case []byte:
cmd = persistString(key, val)
case *List.LinkedList:
cmd = persistList(key, val)
case *set.Set:
cmd = persistSet(key, val)
case dict.Dict:
cmd = persistHash(key, val)
case *SortedSet.SortedSet:
cmd = persistZSet(key, val)
}
if cmd != nil {
_, _ = file.Write(cmd.ToBytes())
}
return true
})
tmpDB.TTLMap.ForEach(func(key string, raw interface{}) bool {
expireTime, _ := raw.(time.Time)
cmd := makeExpireCmd(key, expireTime)
if cmd != nil {
_, _ = file.Write(cmd.ToBytes())
}
return true
})
db.finishRewrite(file)
}
重寫完畢後寫入緩衝區中的資料並替換正式檔案:
func (db *DB) finishRewrite(tmpFile *os.File) {
// 暫停AOF寫入
db.pausingAof.Lock()
defer db.pausingAof.Unlock()
// 將重寫緩衝區內的資料寫入臨時檔案
// 因為handleAof已被暫停,在遍歷期間aofRewriteChan中不會有新資料
loop:
for {
select {
case cmd := <-db.aofRewriteChan:
_, err := tmpFile.Write(cmd.ToBytes())
if err != nil {
logger.Warn(err)
}
default:
// 只有 channel 為空時才會進入此分支
break loop
}
}
// 釋放重寫緩衝區
close(db.aofRewriteChan)
db.aofRewriteChan = nil
// 使用臨時檔案代替aof檔案
_ = db.aofFile.Close()
_ = os.Rename(tmpFile.Name(), db.aofFilename)
// 重新開啟檔案描述符以保證正常寫入
aofFile, err := os.OpenFile(db.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
panic(err)
}
db.aofFile = aofFile
}