1. 程式人生 > 其它 >執行緒互斥與同步 在c#中用mutex類實現執行緒的互斥_全面講解 goroutine 間的同步&協作...

執行緒互斥與同步 在c#中用mutex類實現執行緒的互斥_全面講解 goroutine 間的同步&協作...

技術標籤:執行緒互斥與同步 在c#中用mutex類實現執行緒的互斥

點選上方藍色“ Go語言中文網 ”關注我們, 領全套Go資料 ,每天學習Go語言

基礎概念

競態條件

一份資料被多個執行緒共享,可能會產生爭用和衝突的情況。這種情況被稱為競態條件,競態條件會破壞共享資料的一致性,影響一些執行緒中程式碼和流程的正確執行。

同步

同步可以解決競態問題。它本質上是在控制多個執行緒對共享資源的訪問。這種控制主要包含兩點:

  1. 避免多個執行緒在同一時刻操作同一個資料塊。
  2. 協調多個執行緒,以避免它們在同一時刻執行同一個程式碼塊。

在同步控制下,多個併發執行的執行緒對這個共享資源的訪問是完全序列的。對這個共享資源進行操作的程式碼片段可以視為一個臨界區。

互斥量 sync.Mutex

一個互斥鎖可以被用來保護一個臨界區或者一組相關臨界區。它可以保證,在同一時刻只有一個 goroutine 處於該臨界區之內。

每當有 goroutine 想進入臨界區時,都需要先加鎖,每個 goroutine 離開臨界區時,都要及時解鎖。

Mutex 的使用

varmutexsync.Mutex

funcupdatePublicResource(){
mutex.Lock()
doUpdate()
mutex.Unlock()
}

使用互斥鎖的注意事項:

  1. 不要重複鎖定互斥鎖。
  2. 不要忘記解鎖互斥鎖,推薦使用defer。
  3. 不要對尚未鎖定或者已解鎖的互斥鎖解鎖。
  4. 不要在多個函式之間直接傳遞互斥鎖。(即,不要複製鎖)

對一個已經被鎖定的互斥鎖進行鎖定,會阻塞當前的 goroutine 。如果其他的使用者級 goroutine 也處於等待狀態,整個程式就停止執行了,Go 語言執行時系統會丟擲一個死鎖的 panic 錯誤,程式就會崩潰。因此,切記,每一個鎖定操作,都要有且只有一個對應的解鎖操作。

讀寫鎖 sync.RWMutex

讀寫鎖是讀 / 寫互斥鎖的簡稱,讀寫鎖是互斥鎖的一種擴充套件。一個讀寫鎖中包含了兩個鎖,即:讀鎖和寫鎖。

讀寫鎖可以對共享資源的“讀操作”和“寫操作”進行區別,實現更加細膩的訪問控制。

對於某個受到讀寫鎖保護的共享資源,多個寫操作不能同時進行,寫操作和讀操作也不能同時進行,多個讀操作可以同時進行。

varmutexsync.RWMutex

funcupdatePublicResource(){
mutex.Lock()
doUpdate()
mutex.Unlock()
}

funcreadPublicResource(){
mutex.RLock()
read()
mutex.RUnlock()
}

對寫鎖進行解鎖,會喚醒“所有因試圖鎖定讀鎖,而被阻塞的 goroutine”,通常它們都能成功完成對讀鎖的鎖定。

對讀鎖進行解鎖,會在沒有其他鎖定中讀鎖的前提下,喚醒“因試圖鎖定寫鎖,而被阻塞的 goroutine”;只有一個等待時間最長的被喚醒的 goroutine 能夠成功完成對寫鎖的鎖定。

讀寫鎖是互斥鎖的擴充套件,因此有些方面它還是沿用了互斥鎖的行為模式。比如,解鎖未被鎖定的寫鎖或讀鎖,會立刻引發 panic。

條件變數 sync.Cond

條件變數是基於互斥鎖的,它不用於保護臨界區和共享資源,而是用於協調想要訪問共享資源的那些執行緒的。當共享資源的狀態發生變化時,它可以被用來通知被互斥鎖阻塞的執行緒。

io.Pipe 的實現就基於 sync.Cond。

sync.Cond 需要 sync.Locker 型別的引數用於初始化。

typeLockerinterface{
Lock()
Unlock()
}

noCopy

大多數同步工具禁止在使用後進行復制。Golang 使用兩個內嵌欄位實現 coCopy 功能:noCopy 和 checker。noCopy 欄位用於程式碼檢查工具,checker 欄位用於保證執行時不發生複製。

typeCondstruct{
//用於標識當前結構體在第一次使用後不應該再複製
//用於govet編譯檢查
noCopynoCopy
//Cond基於的鎖
LLocker
//一個基於ticket的通知列表
//儲存了goroutine資訊的雙向連結串列
notifynotifyList
//保證執行時發生拷貝丟擲panic
//在第一次生成時,初始化為Cond地址,如果發生複製,複製物件的地址和當前地址將會不同
checkercopyChecker
}

sync.Cond 提供 3 個方法:

  • Broadcast():喚醒所有等待 Cond 的 goroutine。不需要在鎖的保護下進行。
  • Signal():喚醒一個等待 Cond 的 goroutine。不需要在鎖的保護下進行。
  • Wait():解鎖互斥鎖,掛起當前 goroutine。當 Broadcast 或 Signal 喚醒這個 goroutine,Wait 在返回前會再鎖定互斥鎖。因此 Wait() 需要在鎖的保護下進行。
varlocksync.RWMutex
varsendCond,recvCond*sync.Cond

funcinit(){
sendCond=sync.NewCond(&lock)
recvCond=sync.NewCond(&lock)//獲取讀寫鎖中的讀鎖
}

funcsend(){
lock.Lock()
for!writeCondition(){
sendCond.Wait()
}
writeResource()
lock.Unlock()
recvCond.Signal()//如果有多個接收的goroutine就使用recvCond.Broadcast()
}

funcreceive(){
lock.Lock()
for!readCondition(){
recvCond.Wait()
}
receiveResource()
lock.Unlock()
sendCond.Signal()//如果有多個傳送的goroutine就使用
}

有時 sync.Cond 的功能用 channel 也能實現,不過 channel 的意義更多地在於傳遞資料,而 sync.Cond 的意義在於協程的協作;並且 sync.Cond 更為底層,效率更高。

Tips

  • Cond 在第一次使用後不能複製。
  • 條件變數的通知具有即時性。如果傳送通知的時候沒有 goroutine 為此等待,該通知就會被直接丟棄。
  • Signal() 和 Broadcast() 需要在非鎖定的情況下呼叫,因為 Wait() 的呼叫方處於阻塞狀態,可能錯過通知。
  • Wait() 的呼叫需要基於鎖定狀態。

sync.Cond.Wait()

func(c*Cond)Wait(){
//檢查是否發生複製
c.checker.check()
//將當前gorouitne加入當前條件變數的通知佇列
t:=runtime_notifyListAdd(&c.notify)
c.L.Unlock()
//阻塞當前的goroutine,直至收到通知
runtime_notifyListWait(&c.notify,t)
//收到通知後,加鎖,進入臨界區
c.L.Lock()
}

為什麼要由呼叫方先加鎖,再由Wait()解鎖?

呼叫方在對共享資源的條件進行判斷時,保證共享資源的狀態不被修改,因此進行加鎖。

而當共享資源不滿足當前goroutine的條件時,需要讓出共享資源的執行權,以便其他 goroutine 對其進行修改,因此進行解鎖。

為什麼使用for迴圈多次多次檢查共享資源條件?

  1. 如果存在多個 goroutine 同時等待通知,最終只有一個 goroutine 可以成功獲得執行許可權。那麼其他的 goroutine 應該在檢查不滿足執行條件後繼續等待。
  2. 共享資源存在多種狀態,狀態改變通知是基於鎖的,無法實現更細膩的判斷。這時需要每個 goroutine 對自己所需的狀態反覆檢查。
  3. 即使共享資源的狀態只有兩個,並且每種狀態都只有一個 goroutine 在關注,如上文展示,也應當使用 for 迴圈。因為一個 gorouinte 即使沒有收到條件通知,也可能被喚醒。這是多核 CPU 計算機硬體層面的排程機制。

sync.Cond 的應用場景

  1. 條件變數適合保護那些可執行兩個對立操作的共享資源。比如,一個既可讀又可寫的共享檔案。又比如,既有生產者又有消費者的產品池。
  2. 對於有著對立操作的共享資源(比如一個共享檔案),我們通常需要基於同一個讀寫鎖的兩個條件變數(比如 rcond 和 wcond)分別保護讀操作和寫操作(比如 rcond 保護讀,wcond 保護寫)。讀操作在操作完成後要向 wcond 發通知;寫操作在操作完成後要向 rcond 發通知。
//針對讀寫操作的控制只在初始化時有所變化
varlocksync.RWMutex
varsendCond,recvCond*sync.Cond

funcinit(){
sendCond=sync.NewCond(&lock)
recvCond=sync.NewCond(&lock.RLocker())
}

atomic operation(原子操作)

互斥鎖可以保證臨界區中程式碼的序列執行,但卻不能保證這些程式碼執行的原子性(atomicity)。

只有原子操作才能保證程式碼片段的原子性,原子操作由底層的 CPU 提供了晶片級別的支援

針對同一共享資源的原子操作不能同時進行,針對不同共享資源的原子操作可以同時進行。

因為原子操作不能被中斷,所以它需要足夠簡單和快速。

sync/atomic 提供了以下操作:

  • 加法(add)
  • 比較並交換(compare and swap,簡稱 CAS)
  • 載入(load)
  • 儲存(store)
  • 交換(swap)

支援的資料型別有:

  • int32
  • int64
  • uint32
  • uint64
  • uintptr
  • unsafe.Pointer

CAS 包含 2 步操作,但 Load、Store 這類操作只有一步,不具原子性嗎?

即使像 a = 1 這種簡單的賦值操作也並不一定能夠一次完成。如果右邊的值的儲存寬度超出了計算機的字寬,那麼實際的步驟就會多於一個(或者說底層指令多於一個)。比如,你計算機是32位的,但是你要把一個Int64型別的數賦給變數a,那麼底層指令就肯定多於一個。在這種情況下,多個底層指令的執行期間是可以被打斷的,也就是說CPU在這時可以被切換到別的任務上。如果新任務恰巧要讀寫這個變數a,那麼就會出現值不完整的問題。況且,就算是 a = 1,作業系統和CPU也都不保證這個操作一定不會被打斷。只要被打斷,就很有可能出現併發訪問上的問題,併發安全性也就被破壞了。所以,當有多個goroutine在併發的讀寫同一變數時,它們之間就可能會造成干擾。這種操作不是原子性,併發安全性也無法得到保障。

uint 型別的減法原子操作

//法一
varnumuint32
num=100
delta:=int32(-3)
atomic.AddUint32(&num,uint32(delta))
fmt.Println(num)//97

//法二
varnumuint32
num=100
delta:=-3
atomic.AddUint32(&num,^uint32(-delta-1))
fmt.Println(num)//97

自旋鎖(Spinlock)

自旋鎖(spinlock)是指當一個執行緒在獲取鎖的時候,如果鎖已經被其它執行緒獲取,那麼該執行緒將迴圈等待,然後不斷的判斷鎖是否能夠被成功獲取,直到獲取到鎖才會退出迴圈。

獲取鎖的執行緒一直處於活躍狀態,但是並沒有執行任何有效的任務,使用這種鎖會造成busy-waiting。

自旋鎖利用了 CPU 層面的指令,因此效能比互斥鎖高很多。適合簡單物件的操作以及衝突較少的場景。

varnumint32=10
for{
ifatomic.CompareAndSwapInt32(&num,10,0){
fmt.Println("Thesecondnumberhasgonetozero.")
break
}
time.Sleep(time.Millisecond*500)
}

這在效果上與互斥鎖有些類似。我們在使用互斥鎖的時候,總是假設共享資源的狀態會被其他的 goroutine 頻繁地改變。而for語句加 CAS 操作的假設往往是:共享資源狀態的改變並不頻繁,或者,它的狀態總會變成期望的那樣。這是一種更加樂觀,或者說更加寬鬆的做法。

Tips

  • 當真正使用了一個 atomic.Value 變數(第一次賦值)後,就不應該再進行復制操作了。
  • 不能儲存 nil 值。不過對於介面型別的變數,它的動態值是 nil,動態型別不是 nil,它就不是 nil。
  • 對於一個原子變數,向它儲存的第一個值決定了它的可儲存型別。即使是同一介面的不同型別,也是禁止更換的。對於暴露給外部的儲存函式,應當先判斷其儲存值的合法性。
  • 儲存引用型別時,注意不要把指標暴露給外部。

sync.Pool

sync.Pool 是一個臨時物件池。初次使用後禁止複製。它儲存的物件應該滿足以下特徵:

  • 不需要持久使用,對程式來說可有可無,物件的建立和銷燬不會影響程式功能。因為 Go 語言的 GC 每次執行時都會將臨時物件池清空。
  • 池子中的每一個物件都可以相互替代。

因此,sync.Pool 很適合作為快取池。

GC 是如何清理臨時物件池的?

sync 初始化時,向執行時系統註冊一個函式,這個函式用於清除所有已建立的臨時物件池中的值。這個函式在每次 GC 執行時被呼叫。sync 包中有一個全域性變數 allPools 負責儲存使用中的池列表,供池清理函式使用。

Pool 的內部實現

typePoolstruct{
noCopynoCopy

localunsafe.Pointer//per-Ppool,實際型別是[P]poolLocal
localSizeuintptr//sizeofthelocalarray

victimunsafe.Pointer//localfrompreviouscycle
victimSizeuintptr//sizeofvictimsarray

//建立一個臨時物件
Newfunc()interface{}
}

//Localper-PPool
typepoolLocalInternalstruct{
privateinterface{}//只能由當前P使用
sharedpoolChain//雙向佇列,LocalPcanpushHead/popHead;anyPcanpopTail.
}

Pool 提供了 Put 和 Get 方法用於存取臨時物件。存取臨時物件時,優先操作private,其次是 poolLocal 的共享臨時物件列表 shared (先訪問 goroutine 關聯的 P 對應的 poolLocal,再訪問非關聯的 poolLocal )。當 Get 無法找到可用的臨時物件,就會呼叫 New 建立以一個新的臨時物件。

sync.Map

sync.Map 是一個併發安全的字典。

//可自定義鍵型別和值型別的併發安全字典

typeConcurrentMapstruct{
msync.Map
keyTypereflect.Type
valueTypereflect.Type
}

funcNewConcurrentMap(keyType,valueTypereflect.Type)(*ConcurrentMap,error){
ifkeyType==nil{
returnnil,errors.New("nilkeytype")
}
if!keyType.Comparable(){
returnnil,fmt.Errorf("incomparablekeytype:%s",keyType)
}
ifvalueType==nil{
returnnil,errors.New("nilvaluetype")
}
cMap:=&ConcurrentMap{
keyType:keyType,
valueType:valueType,
}
returncMap,nil
}

func(cMap*ConcurrentMap)Delete(keyinterface{}){
ifreflect.TypeOf(key)!=cMap.keyType{
return
}
cMap.m.Delete(key)
}

func(cMap*ConcurrentMap)Load(keyinterface{})(valueinterface{},okbool){
ifreflect.TypeOf(key)!=cMap.keyType{
return
}
returncMap.m.Load(key)
}

func(cMap*ConcurrentMap)LoadOrStore(key,valueinterface{})(actualinterface{},loadedbool){
ifreflect.TypeOf(key)!=cMap.keyType{
panic(fmt.Errorf("wrongkeytype:%v",reflect.TypeOf(key)))
}
ifreflect.TypeOf(value)!=cMap.valueType{
panic(fmt.Errorf("wrongvaluetype:%v",reflect.TypeOf(value)))
}
actual,loaded=cMap.m.LoadOrStore(key,value)
return
}

func(cMap*ConcurrentMap)Range(ffunc(key,valueinterface{})bool){
cMap.m.Range(f)
}

func(cMap*ConcurrentMap)Store(key,valueinterface{}){
ifreflect.TypeOf(key)!=cMap.keyType{
panic(fmt.Errorf("wrongkeytype:%v",reflect.TypeOf(key)))
}
ifreflect.TypeOf(value)!=cMap.valueType{
panic(fmt.Errorf("wrongvaluetype:%v",reflect.TypeOf(value)))
}
cMap.m.Store(key,value)
}

sync.Map 的內部實現

typeMapstruct{
muMutex

//readcontainstheportionofthemap'scontentsthataresafefor
//concurrentaccess(withorwithoutmuheld).
//
//Thereadfielditselfisalwayssafetoload,butmustonlybestoredwith
//muheld.
//
//Entriesstoredinreadmaybeupdatedconcurrentlywithoutmu,butupdating
//apreviously-expungedentryrequiresthattheentrybecopiedtothedirty
//mapandunexpungedwithmuheld.
readatomic.Value//readOnly

//dirtycontainstheportionofthemap'scontentsthatrequiremutobe
//held.Toensurethatthedirtymapcanbepromotedtothereadmapquickly,
//italsoincludesallofthenon-expungedentriesinthereadmap.
//
//Expungedentriesarenotstoredinthedirtymap.Anexpungedentryinthe
//cleanmapmustbeunexpungedandaddedtothedirtymapbeforeanewvalue
//canbestoredtoit.
//
//Ifthedirtymapisnil,thenextwritetothemapwillinitializeitby
//makingashallowcopyofthecleanmap,omittingstaleentries.
dirtymap[interface{}]*entry

//missescountsthenumberofloadssincethereadmapwaslastupdatedthat
//neededtolockmutodeterminewhetherthekeywaspresent.
//
//Onceenoughmisseshaveoccurredtocoverthecostofcopyingthedirty
//map,thedirtymapwillbepromotedtothereadmap(intheunamended
//state)andthenextstoretothemapwillmakeanewdirtycopy.
missesint
}

Map.read 相當於字典的快照,支援更新和查詢操作,原子操作,不需要持有鎖。Map.dirty 是原生字典,支援增刪改查操作,所有操作需要持有鎖 mu

Map.readMap.dirty 中儲存的鍵值都是指標,而不是基本值。

查詢鍵值對時,首先去 read 字典查詢,如果沒找到,再加鎖去 dirty 字典查詢。

儲存鍵值對時,如果 read 字典中存在這個鍵,就直接更新。如果這個鍵被標記為“已刪除”,則儲存到 dirty 字典,清除“已刪除”的標記。

刪除鍵值時,如果只讀字典中不存在該鍵值對,就直接在 dirty 字典中進行刪除。如果只讀字典中存在該鍵值對,還要對其進行邏輯刪除(標記為“已刪除”)。

在髒字典中查詢鍵值對次數足夠多的時候,sync.Map 會把髒字典直接作為只讀字典,儲存在它的 read 欄位中,然後把代表髒字典的 dirty 欄位的值置為 nil。在這之後,一旦再有新的鍵值對存入,它就會依據只讀字典去重建髒字典。這個時候,它會把只讀字典中已被邏輯刪除的鍵值對過濾掉。

總的來說,只讀字典可能只包含部分鍵值對(含邏輯刪除鍵值對),而髒字典中始終包含全量的鍵值對(不含邏輯刪除鍵值對)。

sync.Map 適用於讀多寫少的情況,如果寫資料比較頻繁可以參考:https://github.com/orcaman/concurrent-map

goroutine的協作工具

sync.WaitGroup

用於同步 goroutine 的協作流程。它可以使一個 goroutine 在其協程完成後才繼續執行後續任務。

開始使用後禁止複製。

varwgsync.WaitGroup
funcmain(){
wg.Add(3)
fori:=0;i3;i++{
godoSomething()
}
wg.Wait()
}
funcdoSomething(){
deferwg.Done()
}

Tips

  • 禁止同時呼叫 WaitGroup 的 Add() 和 Wait(),即杜絕併發執行用 WaitGroup 的方法。原因是在 Wait() 執行時更改其計數器的值會引發 panic。

sync.Once

執行首次被呼叫時的入參函式,並且只執行一次。

Once() 中的 fail-fast 機制

func(o*Once)Do(ffunc()){
//Note:HereisanincorrectimplementationofDo:
//
//ifatomic.CompareAndSwapUint32(&o.done,0,1){
//f()
//}
//
//Doguaranteesthatwhenitreturns,fhasfinished.
//Thisimplementationwouldnotimplementthatguarantee:
//giventwosimultaneouscalls,thewinnerofthecaswould
//callf,andthesecondwouldreturnimmediately,without
//waitingforthefirst'scalltoftocomplete.
//Thisiswhytheslowpathfallsbacktoamutex,andwhy
//theatomic.StoreUint32mustbedelayeduntilafterfreturns.

ifatomic.LoadUint32(&o.done)==0{
//Outlinedslow-pathtoallowinliningofthefast-path.
o.doSlow(f)
}
}

func(o*Once)doSlow(ffunc()){
o.m.Lock()
defero.m.Unlock()
ifo.done==0{
deferatomic.StoreUint32(&o.done,1)
f()
}
}

由於 Once.Do() 保證在返回前 f() 已經執行完成,如果存在多個 goroutine 併發呼叫 Do(),會導致除了獲勝者,其餘 goroutine 都被阻塞在 o.m.Lock() 上。如果 f() 阻塞,可能會導致死鎖。

Once.Do() 不保證 f() 執行成功。

context.Context

funccoordinateWithContext(){
cxt,cancelFunc:=context.WithCancel(context.Background())
//啟動3個具有相關任務的協程
//如果有一個協程出現問題,取消其他協程
fori:=1;igofunc(){
r,e:=fn(ctx)
ife!=nil{
cancelFunc()
}
}
}
time.Sleep(10*time.Second)
fmt.Println("End.")
}

funcfn(ctxcontext.Context)string,error{
resp:=make(chanstring)
err:=make(chanerror)
gofunc(){
responseString,e:=doSomething()
ife!=nil{
err}else{
resp}
}()
select{
casereturn"",ctx.Err()
caser:=returnr,nil
casee:=return"",e
}
}

Context.Done() 返回一個 型別的值,這是一個接收通道。呼叫 cancelFunc() 時,該通道會關閉,阻塞的接收操作會立刻返回。

Context 型別值的撤銷操作會聯動它的子值。

Context 型別還提供了 WithDeadline()WithTimeout() 方法,生成擁有生命週期的 Context 型別。

此外,Context.WithValue() 可以提供協程間的資料傳輸功能。在 Context 中查詢資料時,先在當前 Context 中查詢,如果沒找到,再去父值中查詢。不過 Context 不提供資料更新的方法,只能通過 在子值中覆蓋同名數據、或撤銷 Context 丟棄資料 間接實現。

本文作者:entelecheia

原文連結:https://www.cnblogs.com/win-for-life/p/13372984.html


推薦閱讀

  • 面試官讓我用channel實現sync包裡的同步鎖,是不是故意為難我?

學習交流 Go 語言,掃碼回覆「進群」即可

1ba05265e3a18ae416f1b3345f1cbe05.png

站長 polarisxu

自己的原創文章

不限於 Go 技術

職場和創業經驗

Go語言中文網

每天為你

分享 Go 知識

Go愛好者值得關注

b52fa66df008f0363d72b9e479788b96.png