1. 程式人生 > >Golang 實現 Redis(3): 實現記憶體資料庫

Golang 實現 Redis(3): 實現記憶體資料庫

本文是 golang 實現 redis 系列的第三篇, 主要介紹如何實現記憶體KV資料庫。本文完整原始碼在作者Github: [HDT3213/godis](https://github.com/HDT3213/godis/blob/master/src/db) [db.go](https://github.com/HDT3213/godis/blob/master/src/db/db.go) 是記憶體資料庫的主要原始檔,db.Exec 方法會從協議解析器中獲得命令引數並呼叫相應的處理函式進行處理。 目錄: - [Concurrent Hash Map](#concurrent-hash-map) - [LockMap](#lockmap) - [TTL](#ttl) # Concurrent Hash Map KV 記憶體資料庫的核心是併發安全的雜湊表,常見的設計有幾種: - sync.map: golang 官方提供的併發雜湊表, 效能優秀但結構複雜不便於擴充套件 - juc.ConcurrentHashMap: java 的併發雜湊表採用分段鎖實現。在進行擴容時訪問雜湊表執行緒都將協助進行 rehash 操作,在 rehash 結束前所有的讀寫操作都會阻塞。因為快取資料庫中鍵值對數量巨大且對讀寫操作響應時間要求較高,使用juc的策略是不合適的。 - memcached hashtable: 在後臺執行緒進行 rehash 操作時,主執行緒會判斷要訪問的雜湊槽是否已被 rehash 從而決定操作 old_hashtable 還是操作 primary_hashtable。 這種策略使主執行緒和rehash執行緒之間的競爭限制在雜湊槽內,最小化rehash操作對讀寫操作的影響,這是最理想的實現方式。但由於作者才疏學淺無法使用 golang 實現該策略故忍痛放棄(主要原因在於 golang 沒有 volatile 關鍵字, 保證執行緒可見性的操作非常複雜),歡迎各位讀者討論。 本文采用在 sync.map 釋出前 golang 社群廣泛使用的分段鎖策略。我們將key分散到固定數量的 shard 中避免 rehash 操作。shard 是有鎖保護的 map, 當 shard 進行 rehash 時會阻塞shard內的讀寫,但不會對其他 shard 造成影響。 這種策略簡單可靠易於實現,但由於需要兩次 hash 效能略差。這個 dict 完整原始碼在[Github](https://github.com/HDT3213/godis/blob/master/src/datastruct/dict/concurrent.go) 可以獨立使用(雖然也沒有什麼用。。。)。 定義資料結構: ```golang type ConcurrentDict struct { table []*Shard count int32 } type Shard struct { m map[string]interface{} mutex sync.RWMutex } ``` 在構造時初始化 shard,這個操作相對比較耗時: ```golang func computeCapacity(param int) (size int) { if param <= 16 { return 16 } n := param - 1 n |= n >> 1 n |= n >> 2 n |= n >> 4 n |= n >> 8 n |= n >> 16 if n < 0 { return math.MaxInt32 } else { return int(n + 1) } } func MakeConcurrent(shardCount int) *ConcurrentDict { shardCount = computeCapacity(shardCount) table := make([]*Shard, shardCount) for i := 0; i < shardCount; i++ { table[i] = &Shard{ m: make(map[string]interface{}), } } d := &ConcurrentDict{ count: 0, table: table, } return d } ``` 雜湊演算法選擇FNV演算法: ```golang const prime32 = uint32(16777619) func fnv32(key string) uint32 { hash := uint32(2166136261) for i := 0; i < len(key); i++ { hash *= prime32 hash ^= uint32(key[i]) } return hash } ``` 定位shard, 當n為2的整數冪時 h % n == (n - 1) & h ```golang func (dict *ConcurrentDict) spread(hashCode uint32) uint32 { if dict == nil { panic("dict is nil") } tableSize := uint32(len(dict.table)) return (tableSize - 1) & uint32(hashCode) } func (dict *ConcurrentDict) getShard(index uint32) *Shard { if dict == nil { panic("dict is nil") } return dict.table[index] } ``` Get 和 Put 方法實現: ```golang func (dict *ConcurrentDict) Get(key string) (val interface{}, exists bool) { if dict == nil { panic("dict is nil") } hashCode := fnv32(key) index := dict.spread(hashCode) shard := dict.getShard(index) shard.mutex.RLock() defer shard.mutex.RUnlock() val, exists = shard.m[key] return } func (dict *ConcurrentDict) Len() int { if dict == nil { panic("dict is nil") } return int(atomic.LoadInt32(&dict.count)) } // return the number of new inserted key-value func (dict *ConcurrentDict) Put(key string, val interface{}) (result int) { if dict == nil { panic("dict is nil") } hashCode := fnv32(key) index := dict.spread(hashCode) shard := dict.getShard(index) shard.mutex.Lock() defer shard.mutex.Unlock() if _, ok := shard.m[key]; ok { shard.m[key] = val return 0 } else { shard.m[key] = val dict.addCount() return 1 } } ``` # LockMap 上一節實現的ConcurrentMap 可以保證對單個 key 操作的併發安全性,但是仍然無法滿足需求: 1. MSETNX 命令當且僅當所有給定鍵都不存在時所有給定鍵設定值, 因此我們需要鎖定所有給定的鍵直到完成所有鍵的檢查和設定 2. LPOP 命令移除列表中最後一個元素後需要移除該鍵值對,因此我們鎖定該鍵直到移除元素並移除空列表 因此我們需要實現 db.Locker 用於鎖定一個或一組 key 並在我們需要的時候釋放鎖。 實現 db.Locker 最直接的想法是使用一個 `map[string]*sync.RWMutex`, 加鎖過程分為兩步: 初始化對應的鎖 -> 加鎖, 解鎖過程也分為兩步: 解鎖 -> 釋放對應的鎖。那麼存在一個無法解決的併發問題: | 時間 | 協程A | 協程B | |:-:|:-:|:-:| | 1 | | locker["a"].Unlock() | | 2 | locker["a"] = &sync.RWMutex{} | | | 3 | | delete(locker["a"]) | | 4 | locker["a"].Lock() | | 由於 t3 時協程B釋放了鎖,t4 時協程A試圖加鎖會失敗。 若我們在解鎖時不釋放鎖就可以避免該異常的發生,但是每個曾經使用過的鎖都無法釋放從而造成嚴重的記憶體洩露。 我們注意到雜湊表的長度遠少於可能的鍵的數量,反過來說多個鍵可以共用一個雜湊槽。若我們不為單個鍵加鎖而是為它所在的雜湊槽加鎖,因為雜湊槽的數量非常少即使不釋放鎖也不會佔用太多記憶體。 作者根據這種思想實現了 [LockerMap](https://github.com/HDT3213/godis/blob/master/src/datastruct/lock/lock_map.go) 來解決併發控制問題。 ```golang type Locks struct { table []*sync.RWMutex } func Make(tableSize int) *Locks { table := make([]*sync.RWMutex, tableSize) for i := 0; i < tableSize; i++ { table[i] = &sync.RWMutex{} } return &Locks{ table: table, } } func (locks *Locks)Lock(key string) { index := locks.spread(fnv32(key)) mu := locks.table[index] mu.Lock() } func (locks *Locks)UnLock(key string) { index := locks.spread(fnv32(key)) mu := locks.table[index] mu.Unlock() } ``` 雜湊演算法已經在Dict一節介紹過不再贅述。 在鎖定多個key時需要注意,若協程A持有鍵a的鎖試圖獲得鍵b的鎖,此時協程B持有鍵b的鎖試圖獲得鍵a的鎖則會形成死鎖。 解決方法是所有協程都按照相同順序加鎖,若兩個協程都想獲得鍵a和鍵b的鎖,那麼必須先獲取鍵a的鎖後獲取鍵b的鎖,這樣就可以避免迴圈等待。 ``` func (locks *Locks)Locks(keys ...string) { keySlice := make(sort.StringSlice, len(keys)) copy(keySlice, keys) sort.Sort(keySlice) for _, key := range keySlice { locks.Lock(key) } } func (locks *Locks)RLocks(keys ...string) { keySlice := make(sort.StringSlice, len(keys)) copy(keySlice, keys) sort.Sort(keySlice) for _, key := range keySlice { locks.RLock(key) } } ``` # TTL Time To Live (TTL) 的實現方式非常簡單,其核心是 string -> time 雜湊表。 當訪問某個 key 時會檢查是否過期,並刪除過期key: ```golang func (db *DB) Get(key string) (*DataEntity, bool) { db.stopWorld.RLock() defer db.stopWorld.RUnlock() raw, ok := db.Data.Get(key) if !ok { return nil, false } if db.IsExpired(key) { return nil, false } entity, _ := raw.(*DataEntity) return entity, true } func (db *DB) IsExpired(key string) bool { rawExpireTime, ok := db.TTLMap.Get(key) if !ok { return false } expireTime, _ := rawExpireTime.(time.Time) expired := time.Now().After(expireTime) if expired { db.Remove(key) } return expired } ``` 同時會定時的檢查過期key並刪除: ```golang func (db *DB) CleanExpired() { now := time.Now() toRemove := &List.LinkedList{} db.TTLMap.ForEach(func(key string, val interface{}) bool { expireTime, _ := val.(time.Time) if now.After(expireTime) { // expired db.Data.Remove(key) toRemove.Add(key) } return true }) toRemove.ForEach(func(i int, val interface{}) bool { key, _ := val.(string) db.TTLMap.Remove(key) return true }) } func (db *DB) TimerTask() { ticker := time.NewTicker(db.interval) go func() { for range ticker.C { db.CleanExpired() } }