1. 程式人生 > 程式設計 >理解 Go 標準庫中的 atomic.Value 型別

理解 Go 標準庫中的 atomic.Value 型別

作者:喵叔
原文:blog.betacat.io/post/golang…

在 Go 語言標準庫中,sync/atomic包將底層硬體提供的原子操作封裝成了 Go 的函式。但這些操作只支援幾種基本資料型別,因此為了擴大原子操作的適用範圍,Go 語言在 1.4 版本的時候向sync/atomic包中添加了一個新的型別Value。此型別的值相當於一個容器,可以被用來“原子地"儲存(Store)和載入(Load)任意型別的值。

歷史起源

我在golang-dev郵件列表中翻到了14年的這段討論,有使用者報告了encoding/gob包在多核機器上(80-core)上的效能問題,認為encoding/gob之所以不能完全利用到多核的特性是因為它裡面使用了大量的互斥鎖(mutex),如果把這些互斥鎖換成用atomic.LoadPointer/StorePointer

來做併發控制,那效能將能提升20倍。

針對這個問題,有人提議在已有的atomic包的基礎上封裝出一個atomic.Value型別,這樣使用者就可以在不依賴 Go 內部型別unsafe.Pointer的情況下使用到atomic提供的原子操作。所以我們現在看到的atomic包中除了atomic.Value外,其餘都是早期由彙編寫成的,並且atomic.Value型別的底層實現也是建立在已有的atomic包的基礎上。

那為什麼在上面的場景中,atomic會比mutex效能好很多呢?作者 Dmitry Vyukov 總結了這兩者的一個區別:

Mutexes do no scale. Atomic loads do.

Mutex作業系統實現,而atomic包中的原子操作則由底層硬體直接提供支援。在 CPU 實現的指令集裡,有一些指令被封裝進了atomic包,這些指令在執行的過程中是不允許中斷(interrupt)的,因此原子操作可以在lock-free的情況下保證併發安全,並且它的效能也能做到隨 CPU 個數的增多而線性擴充套件。

好了,說了這麼多的原子操作,我們先來看看什麼樣的操作能被叫做原子操作

原子性

一個或者多個操作在 CPU 執行的過程中不被中斷的特性,稱為原子性(atomicity) 。這些操作對外表現成一個不可分割的整體,他們要麼都執行,要麼都不執行,外界不會看到他們只執行到一半的狀態。而在現實世界中,CPU 不可能不中斷的執行一系列操作,但如果我們在執行多個操作時,能讓他們的中間狀態對外不可見

,那我們就可以宣稱他們擁有了"不可分割”的原子性。

有些朋友可能不知道,在 Go(甚至是大部分語言)中,一條普通的賦值語句其實不是一個原子操作。例如,在32位機器上寫int64型別的變數就會有中間狀態,因為它會被拆成兩次寫操作(MOV)——寫低 32 位和寫高 32 位,如下圖所示:

64位變數的賦值操作

如果一個執行緒剛寫完低32位,還沒來得及寫高32位時,另一個執行緒讀取了這個變數,那它得到的就是一個毫無邏輯的中間變數,這很有可能使我們的程式出現詭異的 Bug。

這還只是一個基礎型別,如果我們對一個結構體進行賦值,那它出現併發問題的概率就更高了。很可能寫執行緒剛寫完一小半的欄位,讀執行緒就來讀取這個變數,那麼就只能讀到僅修改了一部分的值。這顯然破壞了變數的完整性,讀出來的值也是完全錯誤的。

面對這種多執行緒下變數的讀寫問題,我們的主角——atomic.Value登場了,它使得我們可以不依賴於不保證相容性的unsafe.Pointer型別,同時又能將任意資料型別的讀寫操作封裝成原子性操作(讓中間狀態對外不可見)。

使用姿勢

atomic.Value型別對外暴露的方法就兩個:

  • v.Store(c) - 寫操作,將原始的變數c存放到一個atomic.Value型別的v裡。
  • c = v.Load() - 讀操作,從執行緒安全的v中讀取上一步存放的內容。

簡潔的介面使得它的使用也很簡單,只需將需要作併發保護的變數讀取和賦值操作用Load()Store()代替就行了。

下面是一個常見的使用場景:應用程式定期的從外界獲取最新的配置資訊,然後更改自己記憶體中維護的配置變數。工作執行緒根據最新的配置來處理請求。

package main

import (
  "sync/atomic"
  "time"
)

func loadConfig() map[string]string {
  // 從資料庫或者檔案系統中讀取配置資訊,然後以map的形式存放在記憶體裡
  return make(map[string]string)
}

func requests() chan int {
  // 將從外界中接受到的請求放入到channel裡
  return make(chan int)
}

func main() {
  // config變數用來存放該服務的配置資訊
  var config atomic.Value
  // 初始化時從別的地方載入配置檔案,並存到config變數裡
  config.Store(loadConfig())
  go func() {
    // 每10秒鐘定時的拉取最新的配置資訊,並且更新到config變數裡
    for {
      time.Sleep(10 * time.Second)
      // 對應於賦值操作 config = loadConfig()
      config.Store(loadConfig())
    }
  }()
  // 建立工作執行緒,每個工作執行緒都會根據它所讀取到的最新的配置資訊來處理請求
  for i := 0; i < 10; i++ {
    go func() {
      for r := range requests() {
        // 對應於取值操作 c := config
        // 由於Load()返回的是一個interface{}型別,所以我們要先強制轉換一下
        c := config.Load().(map[string]string)
        // 這裡是根據配置資訊處理請求的邏輯...
        _,_ = r,c
      }
    }()
  }
}
複製程式碼

內部實現

羅永浩浩曾說過:

Simplicity is the hidden complexity

我們來看看在簡單的外表下,它到底有哪些 hidden complexity。

資料結構

atomic.Value被設計用來儲存任意型別的資料,所以它內部的欄位是一個interface{}型別,非常的簡單粗暴。

type Value struct {
  v interface{}
}
複製程式碼

除了Value外,這個檔案裡還定義了一個ifaceWords型別,這其實是一個空interface (interface{})的內部表示格式(參見runtime/runtime2.go中eface的定義)。它的作用是將interface{}型別分解,得到其中的兩個欄位。

type ifaceWords struct {
  typ  unsafe.Pointer
  data unsafe.Pointer
}
複製程式碼

寫入(Store)操作

在介紹寫入之前,我們先來看一下 Go 語言內部的unsafe.Pointer型別。

unsafe.Pointer

出於安全考慮,Go 語言並不支援直接操作記憶體,但它的標準庫中又提供一種不安全(不保證向後相容性) 的指標型別unsafe.Pointer,讓程式可以靈活的操作記憶體。

unsafe.Pointer的特別之處在於,它可以繞過 Go 語言型別系統的檢查,與任意的指標型別互相轉換。也就是說,如果兩種型別具有相同的記憶體結構(layout),我們可以將unsafe.Pointer當做橋樑,讓這兩種型別的指標相互轉換,從而實現同一份記憶體擁有兩種不同的解讀方式。

比如說,[]bytestring其實內部的儲存結構都是一樣的,但 Go 語言的型別系統禁止他倆互換。如果藉助unsafe.Pointer,我們就可以實現在零拷貝的情況下,將[]byte陣列直接轉換成string型別。

bytes := []byte{104,101,108,111}

p := unsafe.Pointer(&bytes) //強制轉換成unsafe.Pointer,編譯器不會報錯
str := *(*string)(p) //然後強制轉換成string型別的指標,再將這個指標的值當做string型別取出來
fmt.Println(str) //輸出 "hello"
複製程式碼

知道了unsafe.Pointer的作用,我們可以直接來看程式碼了:

func (v *Value) Store(x interface{}) {
  if x == nil {
    panic("sync/atomic: store of nil value into Value")
  }
  vp := (*ifaceWords)(unsafe.Pointer(v))  // Old value
  xp := (*ifaceWords)(unsafe.Pointer(&x)) // New value
  for {
    typ := LoadPointer(&vp.typ)
    if typ == nil {
      // Attempt to start first store.
      // Disable preemption so that other goroutines can use
      // active spin wait to wait for completion; and so that
      // GC does not see the fake type accidentally.
      runtime_procPin()
      if !CompareAndSwapPointer(&vp.typ,nil,unsafe.Pointer(^uintptr(0))) {
        runtime_procUnpin()
        continue
      }
      // Complete first store.
      StorePointer(&vp.data,xp.data)
      StorePointer(&vp.typ,xp.typ)
      runtime_procUnpin()
      return
    }
    if uintptr(typ) == ^uintptr(0) {
      // First store in progress. Wait.
      // Since we disable preemption around the first store,
      // we can wait with active spinning.
      continue
    }
    // First store completed. Check type and overwrite data.
    if typ != xp.typ {
      panic("sync/atomic: store of inconsistently typed value into Value")
    }
    StorePointer(&vp.data,xp.data)
    return
  }
}
複製程式碼

大概的邏輯:

  • 第5~6行 - 通過unsafe.Pointer現有的要寫入的值分別轉成ifaceWords型別,這樣我們下一步就可以得到這兩個interface{}的原始型別(typ)和真正的值(data)。
  • 從第7行開始就是一個無限 for 迴圈。配合CompareAndSwap食用,可以達到樂觀鎖的功效。
  • 第8行,我們可以通過LoadPointer這個原子操作拿到當前Value中儲存的型別。下面根據這個型別的不同,分3種情況處理。
  1. 第一次寫入(第9~24行) - 一個Value例項被初始化後,它的typ欄位會被設定為指標的零值 nil,所以第9行先判斷如果typ是 nil 那就證明這個Value還未被寫入過資料。那之後就是一段初始寫入的操作:
    • runtime_procPin()這是runtime中的一段函式,具體的功能我不是特別清楚,也沒有找到相關的檔案。這裡猜測一下,一方面它禁止了排程器對當前 goroutine 的搶佔(preemption),使得它在執行當前邏輯的時候不被打斷,以便可以儘快地完成工作,因為別人一直在等待它。另一方面,在禁止搶佔期間,GC 執行緒也無法被啟用,這樣可以防止 GC 執行緒看到一個莫名其妙的指向^uintptr(0)的型別(這是賦值過程中的中間狀態)。
    • 使用CAS操作,先嚐試將typ設定為^uintptr(0)這個中間狀態。如果失敗,則證明已經有別的執行緒搶先完成了賦值操作,那它就解除搶佔鎖,然後重新回到 for 迴圈第一步。
    • 如果設定成功,那證明當前執行緒搶到了這個"樂觀鎖",它可以安全的把v設為傳入的新值了(19~23行)。注意,這裡是先寫data欄位,然後再寫typ欄位。因為我們是以typ欄位的值作為寫入完成與否的判斷依據的。
  2. 第一次寫入還未完成(第25~30行)- 如果看到typ欄位還是^uintptr(0)這個中間型別,證明剛剛的第一次寫入還沒有完成,所以它會繼續迴圈,"忙等"到第一次寫入完成。
  3. 第一次寫入已完成(第31行及之後) - 首先檢查上一次寫入的型別與這一次要寫入的型別是否一致,如果不一致則丟擲異常。反之,則直接把這一次要寫入的值寫入到data欄位。

這個邏輯的主要思想就是,為了完成多個欄位的原子性寫入,我們可以抓住其中的一個欄位,以它的狀態來標誌整個原子寫入的狀態。這個想法我在 TiDB 的事務實現中看到過類似的,他們那邊叫Percolator模型,主要思想也是先選出一個primaryRow,然後所有的操作也是以primaryRow的成功與否作為標誌。嗯,果然是太陽底下沒有新東西。

如果沒有耐心看程式碼,沒關係,這兒還有個簡化版的流程圖:

atomic.Value Store 流程

讀取(Load)操作

先上程式碼:

func (v *Value) Load() (x interface{}) {
  vp := (*ifaceWords)(unsafe.Pointer(v))
  typ := LoadPointer(&vp.typ)
  if typ == nil || uintptr(typ) == ^uintptr(0) {
    // First store not yet completed.
    return nil
  }
  data := LoadPointer(&vp.data)
  xp := (*ifaceWords)(unsafe.Pointer(&x))
  xp.typ = typ
  xp.data = data
  return
}
複製程式碼

讀取相對就簡單很多了,它有兩個分支:

  1. 如果當前的typ是 nil 或者^uintptr(0),那就證明第一次寫入還沒有開始,或者還沒完成,那就直接返回 nil (不對外暴露中間狀態)。
  2. 否則,根據當前看到的typdata構造出一個新的interface{}返回出去。

總結

本文從郵件列表中的一段討論開始,介紹了atomic.Value的被提出來的歷史緣由。然後由淺入深的介紹了它的使用姿勢,以及內部實現。讓大家不僅知其然,還能知其所以然。

另外,再強調一遍,原子操作由底層硬體支援,而鎖則由作業系統提供的 API 實現。若實現相同的功能,前者通常會更有效率,並且更能利用計算機多核的優勢。所以,以後當我們想併發安全的更新一些變數的時候,我們應該優先選擇用atomic.Value來實現。