1. 程式人生 > >原始碼解讀 Golang 的 sync.Map 實現原理

原始碼解讀 Golang 的 sync.Map 實現原理

簡介

Go 的內建 map 是不支援併發寫操作的,原因是 map 寫操作不是併發安全的,當你嘗試多個 Goroutine 操作同一個 map,會產生報錯:fatal error: concurrent map writes

因此官方另外引入了 sync.Map 來滿足併發程式設計中的應用。

sync.Map 的實現原理可概括為:

  • 通過 read 和 dirty 兩個欄位將讀寫分離,讀的資料存在只讀欄位 read 上,將最新寫入的資料則存在 dirty 欄位上
  • 讀取時會先查詢 read,不存在再查詢 dirty,寫入時則只寫入 dirty
  • 讀取 read 並不需要加鎖,而讀或寫 dirty 都需要加鎖
  • 另外有 misses 欄位來統計 read 被穿透的次數(被穿透指需要讀 dirty 的情況),超過一定次數則將 dirty 資料同步到 read 上
  • 對於刪除資料則直接通過標記來延遲刪除

資料結構

Map 的資料結構如下:

type Map struct {
    // 加鎖作用,保護 dirty 欄位
    mu Mutex
    // 只讀的資料,實際資料型別為 readOnly
    read atomic.Value
    // 最新寫入的資料
    dirty map[interface{}]*entry
    // 計數器,每次需要讀 dirty 則 +1
    misses int
}

其中 readOnly 的資料結構為:

type readOnly struct {
    // 內建 map
    m  map[interface{}]*entry
    // 表示 dirty 裡存在 read 裡沒有的 key,通過該欄位決定是否加鎖讀 dirty
    amended bool
}

entry 資料結構則用於儲存值的指標:

type entry struct {
    p unsafe.Pointer  // 等同於 *interface{}
}

屬性 p 有三種狀態:

  • p == nil: 鍵值已經被刪除,且 m.dirty == nil
  • p == expunged
    : 鍵值已經被刪除,但 m.dirty!=nilm.dirty 不存在該鍵值(expunged 實際是空介面指標)
  • 除以上情況,則鍵值對存在,存在於 m.read.m 中,如果 m.dirty!=nil 則也存在於 m.dirty

Map 常用的有以下方法:

  • Load:讀取指定 key 返回 value
  • Store: 儲存(增或改)key-value
  • Delete: 刪除指定 key

原始碼解析

Load

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    // 首先嚐試從 read 中讀取 readOnly 物件
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]

    // 如果不存在則嘗試從 dirty 中獲取
    if !ok && read.amended {
        m.mu.Lock()
        // 由於上面 read 獲取沒有加鎖,為了安全再檢查一次
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]

        // 確實不存在則從 dirty 獲取
        if !ok && read.amended {
            e, ok = m.dirty[key]
            // 呼叫 miss 的邏輯
            m.missLocked()
        }
        m.mu.Unlock()
    }

    if !ok {
        return nil, false
    }
    // 從 entry.p 讀取值
    return e.load()
}

func (m *Map) missLocked() {
    m.misses++
    if m.misses < len(m.dirty) {
        return
    }
    // 當 miss 積累過多,會將 dirty 存入 read,然後 將 amended = false,且 m.dirty = nil
    m.read.Store(readOnly{m: m.dirty})
    m.dirty = nil
    m.misses = 0
}

Store

func (m *Map) Store(key, value interface{}) {
    read, _ := m.read.Load().(readOnly)
    // 如果 read 裡存在,則嘗試存到 entry 裡
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }

    // 如果上一步沒執行成功,則要分情況處理
    m.mu.Lock()
    read, _ = m.read.Load().(readOnly)
    // 和 Load 一樣,重新從 read 獲取一次
    if e, ok := read.m[key]; ok {
        // 情況 1:read 裡存在
        if e.unexpungeLocked() {
            // 如果 p == expunged,則需要先將 entry 賦值給 dirty(因為 expunged 資料不會留在 dirty)
            m.dirty[key] = e
        }
        // 用值更新 entry
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        // 情況 2:read 裡不存在,但 dirty 裡存在,則用值更新 entry
        e.storeLocked(&value)
    } else {
        // 情況 3:read 和 dirty 裡都不存在
        if !read.amended {
            // 如果 amended == false,則呼叫 dirtyLocked 將 read 拷貝到 dirty(除了被標記刪除的資料)
            m.dirtyLocked()
            // 然後將 amended 改為 true
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        // 將新的鍵值存入 dirty
        m.dirty[key] = newEntry(value)
    }
    m.mu.Unlock()
}

func (e *entry) tryStore(i *interface{}) bool {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
    }
}

func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

func (e *entry) storeLocked(i *interface{}) {
    atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        // 判斷 entry 是否被刪除,否則就存到 dirty 中
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}

func (e *entry) tryExpungeLocked() (isExpunged bool) {
    p := atomic.LoadPointer(&e.p)
    for p == nil {
        // 如果有 p == nil(即鍵值對被 delete),則會在這個時機被置為 expunged
        if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
            return true
        }
        p = atomic.LoadPointer(&e.p)
    }
    return p == expunged
}

Delete

func (m *Map) Delete(key interface{}) {
    m.LoadAndDelete(key)
}

// LoadAndDelete 作用等同於 Delete,並且會返回值與是否存在
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) {
    // 獲取邏輯和 Load 類似,read 不存在則查詢 dirty
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            e, ok = m.dirty[key]
            m.missLocked()
        }
        m.mu.Unlock()
    }
    // 查詢到 entry 後執行刪除
    if ok {
        // 將 entry.p 標記為 nil,資料並沒有實際刪除
        // 真正刪除資料並被被置為 expunged,是在 Store 的 tryExpungeLocked 中
        return e.delete()
    }
    return nil, false
}

總結

可見,通過這種讀寫分離的設計,解決了併發情況的寫入安全,又使讀取速度在大部分情況可以接近內建 map,非常適合讀多寫少的情況。

sync.Map 還有一些其他方法:

  • Range:遍歷所有鍵值對,引數是回撥函式
  • LoadOrStore:讀取資料,若不存在則儲存再讀取

這裡就不再詳解了,可參見 原始碼