go sync.map原始碼解析
阿新 • • 發佈:2020-12-21
go中的map是併發不安全的,同時多個協程讀取不會出現問題,但是多個協程 同時讀寫就會出現 fatalerror:concurrentmapreadandmapwrite的錯誤。通用的解決辦法如下:
1. 加鎖
1.1 通用鎖
import "sync" type SafeMap struct { data map[string]string lock sync.Mutex } func (this *SafeMap) get(key string) string{ this.lock.Lock() defer this.lock.Unlock() return this.data[key] } func (this *SafeMap) set(key, value string) { this.lock.Lock() defer this.lock.Unlock() this.data[key] = value }
1.2 讀寫鎖
import "sync" type SafeMap struct { data map[string]string lock sync.RWMutex } func (this *SafeMap) get(key string) string{ this.lock.RLock() defer this.lock.RUnlock() return this.data[key] } func (this *SafeMap) set(key, value string) { this.lock.Lock() defer this.lock.Unlock() this.data[key] = value }
1.3 在go1.9之後,go引入了併發安全的map: sync.map
sync.map的原理可以概括為:
1. 通過read和dirty兩個欄位將讀寫分離,讀的資料存在於read欄位的,最新寫的資料位於dirty欄位上。
2. 讀取時先查詢read,不存在時查詢dirty,寫入時只寫入dirty
3. 讀取read不需要加鎖,而讀或寫dirty需要加鎖
4. 使用misses欄位來統計read被穿透的次數,超過一定次數將資料從dirty同步到read上
5. 刪除資料通過標記來延遲刪除
sync.Map結構如下所示:
type Map struct { mu Mutex //加鎖,寶座dirty欄位 read atomic.Value // 只讀資料,例項型別為 readOnly dirty map[interface{}]*entry //最新寫入的資料 misses int //read被穿透的次數 }
readOnly結構
type readOnly struct { m map[interface{}]*entry amended bool // true if the dirty map contains some key not in m. }
entery結構
type entry struct { // p == nil entry已經被刪除且 dirty == nil
// p == expunged entry已經被刪除,但是dirty != nil且dirty中不存在該元素,這種情況出現於重建dirty時,將read複製到dirty中,複製的過程中將nil標記為expunged,不將其複製到dirty
// 除此之外,entry存在於read中,如果dirty != nil則也存在於dirty中
p unsafe.Pointer // *interface{} }
Load()方法
func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
//首先嚐試從read中讀取 readOnly物件 read, _ := m.read.Load().(readOnly) e, ok := read.m[key]
//如果不存在則嘗試從dirty中讀取 if !ok && read.amended { m.mu.Lock() //再讀取一次read中內容,主要是用於防止上一步加鎖過程中dirty map轉換為read map導致dirty中讀取不到資料 read, _ = m.read.Load().(readOnly) e, ok = read.m[key]
//如果確實不存在,則從dirty中讀取 if !ok && read.amended { e, ok = m.dirty[key] // 不管dirty中存不存在,都將miss + 1, 如果misses值等於dirty中元素個數,就會把dirty中元素遷移到read中 m.missLocked() } m.mu.Unlock() } if !ok { return nil, false } return e.load() }
Store()方法
// Store sets the value for a key. func (m *Map) Store(key, value interface{}) {
//直接再read中查詢 read, _ := m.read.Load().(readOnly)
//如果找到了,直接更新read中值,返回 if e, ok := read.m[key]; ok && e.tryStore(&value) { return } //如不存在,去dirty中讀 m.mu.Lock()
//二次檢測 read, _ = m.read.Load().(readOnly)
//如果此時讀到,read中不允許直接的新增刪除值,此種情況說明加鎖之前存在dirty升級為read的操作 if e, ok := read.m[key]; ok {
//如果讀到的值為expunged, 說明生成dirty時,複製read中的元素,對於nil的元素,搞成了expunged,所以意味著dirty不為nil,且dirty中沒有該元素 if e.unexpungeLocked() { // The entry was previously expunged, which implies that there is a // non-nil dirty map and this entry is not in it.
//更新dirty中的值 m.dirty[key] = e }
//更新read中的值 e.storeLocked(&value)
//此時,read中沒有該元素,需要更新dirty中的值 } else if e, ok := m.dirty[key]; ok { e.storeLocked(&value) } else {
// 如果 !read.amended, 說明dirty為nil, 需要將read map複製一份到dirty map if !read.amended { // We're adding the first new key to the dirty map. // Make sure it is allocated and mark the read-only map as incomplete. m.dirtyLocked()
//設定read.amended == true m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) } m.mu.Unlock() }
LoadOrStoce()
// LoadOrStore returns the existing value for the key if present. // Otherwise, it stores and returns the given value. // The loaded result is true if the value was loaded, false if stored. func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) { // Avoid locking if it's a clean hit.
//讀取read中是否存在該key read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok {
//如果存在(是否標識為刪除由tryLoadOrStore處理),嘗試獲取該元素的值,或者將值寫入 actual, loaded, ok := e.tryLoadOrStore(value) if ok { return actual, loaded } } m.mu.Lock()
//二次檢測 read, _ = m.read.Load().(readOnly)
//如果此時讀到,read中不允許直接的新增刪除值,此種情況說明加鎖之前存在dirty升級為read的操作 if e, ok := read.m[key]; ok {
//如果讀到的值為expunged, 說明生成dirty時,複製read中的元素,對於nil的元素,搞成了expunged,所以意味著dirty不為nil,且dirty中沒有該元素 if e.unexpungeLocked() { m.dirty[key] = e }
//如果存在(是否標識為刪除由tryLoadOrStore處理),嘗試獲取該元素的值,或者將值寫入 actual, loaded, _ = e.tryLoadOrStore(value)
// 此時,read中沒有元素,需要 tryLoadOrStore dirty中值 } else if e, ok := m.dirty[key]; ok { actual, loaded, _ = e.tryLoadOrStore(value) m.missLocked() } else {
// 如果 !read.amended, 說明dirty為nil, 需要將read map複製一份到dirty map if !read.amended { // We're adding the first new key to the dirty map. // Make sure it is allocated and mark the read-only map as incomplete. m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) }
// 將值寫入dirty中 m.dirty[key] = newEntry(value) actual, loaded = value, false } m.mu.Unlock() return actual, loaded }
// tryLoadOrStore atomically loads or stores a value if the entry is not // expunged. // // If the entry is expunged, tryLoadOrStore leaves the entry unchanged and // returns with ok==false. // 如果元素是 expunged, tryLoadOrStore 保持entry不變並直接返回false func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) { p := atomic.LoadPointer(&e.p) // 標識刪除,直接返回 if p == expunged { return nil, false, false } // 如果元素存在真實值,則直接返回該真實值 if p != nil { return *(*interface{})(p), true, true } // Copy the interface after the first load to make this method more amenable // to escape analysis: if we hit the "load" path or the entry is expunged, we // shouldn't bother heap-allocating. // 如果 p == nil, 則更新該元素值 ic := i for { if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) { return i, false, true } p = atomic.LoadPointer(&e.p) if p == expunged { return nil, false, false } if p != nil { return *(*interface{})(p), true, true } } }
Delete()方法
// Delete deletes the value for a key. func (m *Map) Delete(key interface{}) {
// 檢查read中是否存在 read, _ := m.read.Load().(readOnly) e, ok := read.m[key]
// 如果不存在,並且dirty中存在元素 if !ok && read.amended { m.mu.Lock()
// 二次檢測 read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended {
// dirty中刪除 delete(m.dirty, key) } m.mu.Unlock() } if ok {
// 如果存在,直接刪除 e.delete() } } func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }
Range()方法
// Range calls f sequentially for each key and value present in the map. // If f returns false, range stops the iteration. // // Range does not necessarily correspond to any consistent snapshot of the Map's // contents: no key will be visited more than once, but if the value for any key // is stored or deleted concurrently, Range may reflect any mapping for that key // from any point during the Range call. // // Range may be O(N) with the number of elements in the map even if f returns // false after a constant number of calls. func (m *Map) Range(f func(key, value interface{}) bool) { // We need to be able to iterate over all of the keys that were already // present at the start of the call to Range. // If read.amended is false, then read.m satisfies that property without // requiring us to hold m.mu for a long time. read, _ := m.read.Load().(readOnly) // 如果 amended == true, 說明dirty中存在元素,且包含所有有效元素,此時,使用dirty map if read.amended { // m.dirty contains keys not in read.m. Fortunately, Range is already O(N) // (assuming the caller does not break out early), so a call to Range // amortizes an entire copy of the map: we can promote the dirty copy // immediately! m.mu.Lock() read, _ = m.read.Load().(readOnly) if read.amended {
//使用dirty map並將其升級為 read map read = readOnly{m: m.dirty} m.read.Store(read) m.dirty = nil m.misses = 0 } m.mu.Unlock() } // 使用read map讀 for k, e := range read.m { v, ok := e.load()
// 被刪除的不計入 if !ok { continue } if !f(k, v) { break } } }
當sync.Map中存在大量寫操作的情況下,會導致read中讀不到資料,依然會頻繁加鎖,同時dirty升級為read,整體效能就會很低,所以sync.Map更加適合大量讀、少量寫的場景。