Golang 實現 Redis(5): 使用跳錶實現 SortedSet
阿新 • • 發佈:2020-05-09
本文是使用 golang 實現 redis 系列的第五篇, 將介紹如何使用跳錶實現有序集合(SortedSet)的相關功能。
跳錶(skiplist) 是 Redis 中 SortedSet 資料結構的底層實現, 跳錶優秀的範圍查詢能力為`ZRange`和`ZRangeByScore`等命令提供了支援。
本文完整原始碼在Github[HDT3213/godis](https://github.com/HDT3213/godis/tree/master/src/datastruct/sortedset)
# 結構定義
實現`ZRange`命令最簡單的資料結構是有序連結串列:
![](https://img2020.cnblogs.com/blog/793413/202005/793413-20200503230726387-221850327.png)
在有序連結串列上實現`ZRange key start end`命令需要進行`end`次查詢, 即時間複雜度為 *O(n)*
跳錶的優化思路是新增上層連結串列,上層連結串列中會跳過一些節點。如圖所示:
![](https://img2020.cnblogs.com/blog/793413/202005/793413-20200503230748021-1515963822.png)
在有兩層的跳錶中,搜尋的時間複雜度降低為了*O(n / 2)*。以此類推在有 log2(n) 層的跳錶中,搜尋元素的時間複雜度為*O(log n)*。
瞭解資料結構之後,可以定義相關的型別了:
```go
// 對外的元素抽象
type Element struct {
Member string
Score float64
}
type Node struct {
Element // 元素的名稱和 score
backward *Node // 後向指標
level []*Level // 前向指標, level[0] 為最下層
}
// 節點中每一層的抽象
type Level struct {
forward *Node // 指向同層中的下一個節點
span int64 // 到 forward 跳過的節點數
}
// 跳錶的定義
type skiplist struct {
header *Node
tail *Node
length int64
level int16
}
```
用一張圖來表示一下:
![](https://img2020.cnblogs.com/blog/793413/202005/793413-20200503230906423-1276062392.png)
# 查詢節點
有了上文的描述查詢節點的邏輯不難實現, 以 RangeByRank 的核心邏輯為例:
```go
// 尋找排名為 rank 的節點, rank 從1開始
func (skiplist *skiplist) getByRank(rank int64)*Node {
var i int64 = 0
n := skiplist.header
// 從頂層向下查詢
for level := skiplist.level - 1; level >= 0; level-- {
// 從當前層向前搜尋
// 若當前層的下一個節點已經超過目標 (i+n.level[level].span > rank),則結束當前層搜尋進入下一層
for n.level[level].forward != nil && (i+n.level[level].span) <= rank {
i += n.level[level].span
n = n.level[level].forward
}
if i == rank {
return n
}
}
return nil
}
```
`ZRangeByScore` 命令需要 `getFirstInScoreRange` 函式找到分數範圍內第一個節點:
```go
func (skiplist *skiplist) getFirstInScoreRange(min *ScoreBorder, max *ScoreBorder) *Node {
// 判斷跳錶和範圍是否有交集,若無交集提早返回
if !skiplist.hasInRange(min, max) {
return nil
}
n := skiplist.header
// 從頂層向下查詢
for level := skiplist.level - 1; level >= 0; level-- {
// 若 forward 節點仍未進入範圍則繼續向前(forward)
// 若 forward 節點已進入範圍,當 level > 0 時 forward 節點不能保證是 *第一個* 在 min 範圍內的節點, 因此需進入下一層查詢
for n.level[level].forward != nil && !min.less(n.level[level].forward.Score) {
n = n.level[level].forward
}
}
// 當從外層迴圈退出時 level=0 (最下層), n.level[0].forward 一定是 min 範圍內的第一個節點
n = n.level[0].forward
if !max.greater(n.Score) {
return nil
}
return n
}
```
# 插入節點
插入節點的操作比較多,我們以註釋的方式進行說明:
```golang
func (skiplist *skiplist)insert(member string, score float64)*Node {
// 尋找新節點的先驅節點,它們的 forward 將指向新節點
// 因為每層都有一個 forward 指標, 所以每層都會對應一個先驅節點
// 找到這些先驅節點並儲存在 update 陣列中
update := make([]*Node, maxLevel)
rank := make([]int64, maxLevel) // 儲存各層先驅節點的排名,用於計算span
node := skiplist.header
for i := skiplist.level - 1; i >= 0; i-- { // 從上層向下尋找
// 初始化 rank
if i == skiplist.level - 1 {
rank[i] = 0
} else {
rank[i] = rank[i + 1]
}
if node.level[i] != nil {
// 遍歷搜尋
for node.level[i].forward != nil &&
(node.level[i].forward.Score < score ||
(node.level[i].forward.Score == score && node.level[i].forward.Member < member)) { // same score, different key
rank[i] += node.level[i].span
node = node.level[i].forward
}
}
update[i] = node
}
level := randomLevel() // 隨機決定新節點的層數
// 可能需要建立新的層
if level > skiplist.level {
for i := skiplist.level; i < level; i++ {
rank[i] = 0
update[i] = skiplist.header
update[i].level[i].span = skiplist.length
}
skiplist.level = level
}
// 建立新節點並插入跳錶
node = makeNode(level, score, member)
for i := int16(0); i < level; i++ {
// 新節點的 forward 指向先驅節點的 forward
node.level[i].forward = update[i].level[i].forward
// 先驅節點的 forward 指向新節點
update[i].level[i].forward = node
// 計算先驅節點和新節點的 span
node.level[i].span = update[i].level[i].span - (rank[0] - rank[i])
update[i].level[i].span = (rank[0] - rank[i]) + 1
}
// 新節點可能不會包含所有層
// 對於沒有層,先驅節點的 span 會加1 (後面插入了新節點導致span+1)
for i := level; i < skiplist.level; i++ {
update[i].level[i].span++
}
// 更新後向指標
if update[0] == skiplist.header {
node.backward = nil
} else {
node.backward = update[0]
}
if node.level[0].forward != nil {
node.level[0].forward.backward = node
} else {
skiplist.tail = node
}
skiplist.length++
return node
}
```
randomLevel 用於隨機決定新節點包含的層數,隨機結果出現2的概率是出現1的25%, 出現3的概率是出現2的25%:
```go
func randomLevel() int16 {
level := int16(1)
for float32(rand.Int31()&0xFFFF) < (0.25 * 0xFFFF) {
level++
}
if level < maxLevel {
return level
}
return maxLevel
}
```
# 刪除節點
刪除節點的思路與插入節點基本一致:
```go
// 刪除操作可能一次刪除多個節點
func (skiplist *skiplist) RemoveRangeByRank(start int64, stop int64)(removed []*Element) {
var i int64 = 0 // 當前指標的排名
update := make([]*Node, maxLevel)
removed = make([]*Element, 0)
// 從頂層向下尋找目標的先驅節點
node := skiplist.header
for level := skiplist.level - 1; level >= 0; level-- {
for node.level[level].forward != nil && (i+node.level[level].span) < start {
i += node.level[level].span
node = node.level[level].forward
}
update[level] = node
}
i++
node = node.level[0].forward // node 是目標範圍內第一個節點
// 刪除範圍內的所有節點
for node != nil && i < stop {
next := node.level[0].forward
removedElement := node.Element
removed = append(removed, &removedElement)
skiplist.removeNode(node, update)
node = next
i++
}
return removed
}
```
接下來分析一下執行具體節點刪除操作的removeNode函式:
```go
// 傳入目標節點和刪除後的先驅節點
// 在批量刪除時我們傳入的 update 陣列是相同的
func (skiplist *skiplist) removeNode(node *Node, update []*Node) {
for i := int16(0); i < skiplist.level; i++ {
// 如果先驅節點的forward指標指向了目標節點,則需要修改先驅的forward指標跳過要刪除的目標節點
// 同時更新先驅的 span
if update[i].level[i].forward == node {
update[i].level[i].span += node.level[i].span - 1
update[i].level[i].forward = node.level[i].forward
} else {
update[i].level[i].span--
}
}
// 修改目標節點後繼節點的backward指標
if node.level[0].forward != nil {
node.level[0].forward.backward = node.backward
} else {
skiplist.tail = node.backward
}
// 必要時刪除空白的層
for skiplist.level > 1 && skiplist.header.level[skiplist.level-1].forward == nil {
skiplist.level--
}
skiplist.length