執行緒互斥與同步 在c#中用mutex類實現執行緒的互斥_全面講解 goroutine 間的同步&協作...
技術標籤:執行緒互斥與同步 在c#中用mutex類實現執行緒的互斥
點選上方藍色“ Go語言中文網 ”關注我們, 領全套Go資料 ,每天學習Go語言基礎概念
競態條件
一份資料被多個執行緒共享,可能會產生爭用和衝突的情況。這種情況被稱為競態條件,競態條件會破壞共享資料的一致性,影響一些執行緒中程式碼和流程的正確執行。
同步
同步可以解決競態問題。它本質上是在控制多個執行緒對共享資源的訪問。這種控制主要包含兩點:
- 避免多個執行緒在同一時刻操作同一個資料塊。
- 協調多個執行緒,以避免它們在同一時刻執行同一個程式碼塊。
在同步控制下,多個併發執行的執行緒對這個共享資源的訪問是完全序列的。對這個共享資源進行操作的程式碼片段可以視為一個臨界區。
互斥量 sync.Mutex
一個互斥鎖可以被用來保護一個臨界區或者一組相關臨界區。它可以保證,在同一時刻只有一個 goroutine 處於該臨界區之內。
每當有 goroutine 想進入臨界區時,都需要先加鎖,每個 goroutine 離開臨界區時,都要及時解鎖。
Mutex 的使用
varmutexsync.Mutex
funcupdatePublicResource(){
mutex.Lock()
doUpdate()
mutex.Unlock()
}
使用互斥鎖的注意事項:
- 不要重複鎖定互斥鎖。
- 不要忘記解鎖互斥鎖,推薦使用defer。
- 不要對尚未鎖定或者已解鎖的互斥鎖解鎖。
- 不要在多個函式之間直接傳遞互斥鎖。(即,不要複製鎖)
對一個已經被鎖定的互斥鎖進行鎖定,會阻塞當前的 goroutine 。如果其他的使用者級 goroutine 也處於等待狀態,整個程式就停止執行了,Go 語言執行時系統會丟擲一個死鎖的 panic 錯誤,程式就會崩潰。因此,切記,每一個鎖定操作,都要有且只有一個對應的解鎖操作。
讀寫鎖 sync.RWMutex
讀寫鎖是讀 / 寫互斥鎖的簡稱,讀寫鎖是互斥鎖的一種擴充套件。一個讀寫鎖中包含了兩個鎖,即:讀鎖和寫鎖。
讀寫鎖可以對共享資源的“讀操作”和“寫操作”進行區別,實現更加細膩的訪問控制。
對於某個受到讀寫鎖保護的共享資源,多個寫操作不能同時進行,寫操作和讀操作也不能同時進行,多個讀操作可以同時進行。
varmutexsync.RWMutex
funcupdatePublicResource(){
mutex.Lock()
doUpdate()
mutex.Unlock()
}
funcreadPublicResource(){
mutex.RLock()
read()
mutex.RUnlock()
}
對寫鎖進行解鎖,會喚醒“所有因試圖鎖定讀鎖,而被阻塞的 goroutine”,通常它們都能成功完成對讀鎖的鎖定。
對讀鎖進行解鎖,會在沒有其他鎖定中讀鎖的前提下,喚醒“因試圖鎖定寫鎖,而被阻塞的 goroutine”;只有一個等待時間最長的被喚醒的 goroutine 能夠成功完成對寫鎖的鎖定。
讀寫鎖是互斥鎖的擴充套件,因此有些方面它還是沿用了互斥鎖的行為模式。比如,解鎖未被鎖定的寫鎖或讀鎖,會立刻引發 panic。
條件變數 sync.Cond
條件變數是基於互斥鎖的,它不用於保護臨界區和共享資源,而是用於協調想要訪問共享資源的那些執行緒的。當共享資源的狀態發生變化時,它可以被用來通知被互斥鎖阻塞的執行緒。
io.Pipe 的實現就基於 sync.Cond。
sync.Cond 需要 sync.Locker 型別的引數用於初始化。
typeLockerinterface{
Lock()
Unlock()
}
noCopy
大多數同步工具禁止在使用後進行復制。Golang 使用兩個內嵌欄位實現 coCopy 功能:noCopy 和 checker。noCopy 欄位用於程式碼檢查工具,checker 欄位用於保證執行時不發生複製。
typeCondstruct{
//用於標識當前結構體在第一次使用後不應該再複製
//用於govet編譯檢查
noCopynoCopy
//Cond基於的鎖
LLocker
//一個基於ticket的通知列表
//儲存了goroutine資訊的雙向連結串列
notifynotifyList
//保證執行時發生拷貝丟擲panic
//在第一次生成時,初始化為Cond地址,如果發生複製,複製物件的地址和當前地址將會不同
checkercopyChecker
}
sync.Cond 提供 3 個方法:
- Broadcast():喚醒所有等待 Cond 的 goroutine。不需要在鎖的保護下進行。
- Signal():喚醒一個等待 Cond 的 goroutine。不需要在鎖的保護下進行。
- Wait():解鎖互斥鎖,掛起當前 goroutine。當 Broadcast 或 Signal 喚醒這個 goroutine,Wait 在返回前會再鎖定互斥鎖。因此 Wait() 需要在鎖的保護下進行。
varlocksync.RWMutex
varsendCond,recvCond*sync.Cond
funcinit(){
sendCond=sync.NewCond(&lock)
recvCond=sync.NewCond(&lock)//獲取讀寫鎖中的讀鎖
}
funcsend(){
lock.Lock()
for!writeCondition(){
sendCond.Wait()
}
writeResource()
lock.Unlock()
recvCond.Signal()//如果有多個接收的goroutine就使用recvCond.Broadcast()
}
funcreceive(){
lock.Lock()
for!readCondition(){
recvCond.Wait()
}
receiveResource()
lock.Unlock()
sendCond.Signal()//如果有多個傳送的goroutine就使用
}
有時 sync.Cond 的功能用 channel 也能實現,不過 channel 的意義更多地在於傳遞資料,而 sync.Cond 的意義在於協程的協作;並且 sync.Cond 更為底層,效率更高。
Tips
- Cond 在第一次使用後不能複製。
- 條件變數的通知具有即時性。如果傳送通知的時候沒有 goroutine 為此等待,該通知就會被直接丟棄。
- Signal() 和 Broadcast() 需要在非鎖定的情況下呼叫,因為 Wait() 的呼叫方處於阻塞狀態,可能錯過通知。
- Wait() 的呼叫需要基於鎖定狀態。
sync.Cond.Wait()
func(c*Cond)Wait(){
//檢查是否發生複製
c.checker.check()
//將當前gorouitne加入當前條件變數的通知佇列
t:=runtime_notifyListAdd(&c.notify)
c.L.Unlock()
//阻塞當前的goroutine,直至收到通知
runtime_notifyListWait(&c.notify,t)
//收到通知後,加鎖,進入臨界區
c.L.Lock()
}
為什麼要由呼叫方先加鎖,再由Wait()解鎖?
呼叫方在對共享資源的條件進行判斷時,保證共享資源的狀態不被修改,因此進行加鎖。
而當共享資源不滿足當前goroutine的條件時,需要讓出共享資源的執行權,以便其他 goroutine 對其進行修改,因此進行解鎖。
為什麼使用for迴圈多次多次檢查共享資源條件?
- 如果存在多個 goroutine 同時等待通知,最終只有一個 goroutine 可以成功獲得執行許可權。那麼其他的 goroutine 應該在檢查不滿足執行條件後繼續等待。
- 共享資源存在多種狀態,狀態改變通知是基於鎖的,無法實現更細膩的判斷。這時需要每個 goroutine 對自己所需的狀態反覆檢查。
- 即使共享資源的狀態只有兩個,並且每種狀態都只有一個 goroutine 在關注,如上文展示,也應當使用 for 迴圈。因為一個 gorouinte 即使沒有收到條件通知,也可能被喚醒。這是多核 CPU 計算機硬體層面的排程機制。
sync.Cond 的應用場景
- 條件變數適合保護那些可執行兩個對立操作的共享資源。比如,一個既可讀又可寫的共享檔案。又比如,既有生產者又有消費者的產品池。
- 對於有著對立操作的共享資源(比如一個共享檔案),我們通常需要基於同一個讀寫鎖的兩個條件變數(比如 rcond 和 wcond)分別保護讀操作和寫操作(比如 rcond 保護讀,wcond 保護寫)。讀操作在操作完成後要向 wcond 發通知;寫操作在操作完成後要向 rcond 發通知。
//針對讀寫操作的控制只在初始化時有所變化
varlocksync.RWMutex
varsendCond,recvCond*sync.Cond
funcinit(){
sendCond=sync.NewCond(&lock)
recvCond=sync.NewCond(&lock.RLocker())
}
atomic operation(原子操作)
互斥鎖可以保證臨界區中程式碼的序列執行,但卻不能保證這些程式碼執行的原子性(atomicity)。
只有原子操作才能保證程式碼片段的原子性,原子操作由底層的 CPU 提供了晶片級別的支援。
針對同一共享資源的原子操作不能同時進行,針對不同共享資源的原子操作可以同時進行。
因為原子操作不能被中斷,所以它需要足夠簡單和快速。
sync/atomic 提供了以下操作:
- 加法(add)
- 比較並交換(compare and swap,簡稱 CAS)
- 載入(load)
- 儲存(store)
- 交換(swap)
支援的資料型別有:
- int32
- int64
- uint32
- uint64
- uintptr
- unsafe.Pointer
CAS 包含 2 步操作,但 Load、Store 這類操作只有一步,不具原子性嗎?
即使像 a = 1 這種簡單的賦值操作也並不一定能夠一次完成。如果右邊的值的儲存寬度超出了計算機的字寬,那麼實際的步驟就會多於一個(或者說底層指令多於一個)。比如,你計算機是32位的,但是你要把一個Int64型別的數賦給變數a,那麼底層指令就肯定多於一個。在這種情況下,多個底層指令的執行期間是可以被打斷的,也就是說CPU在這時可以被切換到別的任務上。如果新任務恰巧要讀寫這個變數a,那麼就會出現值不完整的問題。況且,就算是 a = 1,作業系統和CPU也都不保證這個操作一定不會被打斷。只要被打斷,就很有可能出現併發訪問上的問題,併發安全性也就被破壞了。所以,當有多個goroutine在併發的讀寫同一變數時,它們之間就可能會造成干擾。這種操作不是原子性,併發安全性也無法得到保障。
uint 型別的減法原子操作
//法一
varnumuint32
num=100
delta:=int32(-3)
atomic.AddUint32(&num,uint32(delta))
fmt.Println(num)//97
//法二
varnumuint32
num=100
delta:=-3
atomic.AddUint32(&num,^uint32(-delta-1))
fmt.Println(num)//97
自旋鎖(Spinlock)
自旋鎖(spinlock)是指當一個執行緒在獲取鎖的時候,如果鎖已經被其它執行緒獲取,那麼該執行緒將迴圈等待,然後不斷的判斷鎖是否能夠被成功獲取,直到獲取到鎖才會退出迴圈。
獲取鎖的執行緒一直處於活躍狀態,但是並沒有執行任何有效的任務,使用這種鎖會造成busy-waiting。
自旋鎖利用了 CPU 層面的指令,因此效能比互斥鎖高很多。適合簡單物件的操作以及衝突較少的場景。
varnumint32=10
for{
ifatomic.CompareAndSwapInt32(&num,10,0){
fmt.Println("Thesecondnumberhasgonetozero.")
break
}
time.Sleep(time.Millisecond*500)
}
這在效果上與互斥鎖有些類似。我們在使用互斥鎖的時候,總是假設共享資源的狀態會被其他的 goroutine 頻繁地改變。而for語句加 CAS 操作的假設往往是:共享資源狀態的改變並不頻繁,或者,它的狀態總會變成期望的那樣。這是一種更加樂觀,或者說更加寬鬆的做法。
Tips
- 當真正使用了一個 atomic.Value 變數(第一次賦值)後,就不應該再進行復制操作了。
- 不能儲存 nil 值。不過對於介面型別的變數,它的動態值是 nil,動態型別不是 nil,它就不是 nil。
- 對於一個原子變數,向它儲存的第一個值決定了它的可儲存型別。即使是同一介面的不同型別,也是禁止更換的。對於暴露給外部的儲存函式,應當先判斷其儲存值的合法性。
- 儲存引用型別時,注意不要把指標暴露給外部。
sync.Pool
sync.Pool
是一個臨時物件池。初次使用後禁止複製。它儲存的物件應該滿足以下特徵:
- 不需要持久使用,對程式來說可有可無,物件的建立和銷燬不會影響程式功能。因為 Go 語言的 GC 每次執行時都會將臨時物件池清空。
- 池子中的每一個物件都可以相互替代。
因此,sync.Pool
很適合作為快取池。
GC 是如何清理臨時物件池的?
sync
初始化時,向執行時系統註冊一個函式,這個函式用於清除所有已建立的臨時物件池中的值。這個函式在每次 GC 執行時被呼叫。sync 包中有一個全域性變數 allPools
負責儲存使用中的池列表,供池清理函式使用。
Pool 的內部實現
typePoolstruct{
noCopynoCopy
localunsafe.Pointer//per-Ppool,實際型別是[P]poolLocal
localSizeuintptr//sizeofthelocalarray
victimunsafe.Pointer//localfrompreviouscycle
victimSizeuintptr//sizeofvictimsarray
//建立一個臨時物件
Newfunc()interface{}
}
//Localper-PPool
typepoolLocalInternalstruct{
privateinterface{}//只能由當前P使用
sharedpoolChain//雙向佇列,LocalPcanpushHead/popHead;anyPcanpopTail.
}
Pool
提供了 Put 和 Get 方法用於存取臨時物件。存取臨時物件時,優先操作private,其次是 poolLocal
的共享臨時物件列表 shared
(先訪問 goroutine
關聯的 P
對應的 poolLocal
,再訪問非關聯的 poolLocal
)。當 Get
無法找到可用的臨時物件,就會呼叫 New
建立以一個新的臨時物件。
sync.Map
sync.Map
是一個併發安全的字典。
//可自定義鍵型別和值型別的併發安全字典
typeConcurrentMapstruct{
msync.Map
keyTypereflect.Type
valueTypereflect.Type
}
funcNewConcurrentMap(keyType,valueTypereflect.Type)(*ConcurrentMap,error){
ifkeyType==nil{
returnnil,errors.New("nilkeytype")
}
if!keyType.Comparable(){
returnnil,fmt.Errorf("incomparablekeytype:%s",keyType)
}
ifvalueType==nil{
returnnil,errors.New("nilvaluetype")
}
cMap:=&ConcurrentMap{
keyType:keyType,
valueType:valueType,
}
returncMap,nil
}
func(cMap*ConcurrentMap)Delete(keyinterface{}){
ifreflect.TypeOf(key)!=cMap.keyType{
return
}
cMap.m.Delete(key)
}
func(cMap*ConcurrentMap)Load(keyinterface{})(valueinterface{},okbool){
ifreflect.TypeOf(key)!=cMap.keyType{
return
}
returncMap.m.Load(key)
}
func(cMap*ConcurrentMap)LoadOrStore(key,valueinterface{})(actualinterface{},loadedbool){
ifreflect.TypeOf(key)!=cMap.keyType{
panic(fmt.Errorf("wrongkeytype:%v",reflect.TypeOf(key)))
}
ifreflect.TypeOf(value)!=cMap.valueType{
panic(fmt.Errorf("wrongvaluetype:%v",reflect.TypeOf(value)))
}
actual,loaded=cMap.m.LoadOrStore(key,value)
return
}
func(cMap*ConcurrentMap)Range(ffunc(key,valueinterface{})bool){
cMap.m.Range(f)
}
func(cMap*ConcurrentMap)Store(key,valueinterface{}){
ifreflect.TypeOf(key)!=cMap.keyType{
panic(fmt.Errorf("wrongkeytype:%v",reflect.TypeOf(key)))
}
ifreflect.TypeOf(value)!=cMap.valueType{
panic(fmt.Errorf("wrongvaluetype:%v",reflect.TypeOf(value)))
}
cMap.m.Store(key,value)
}
sync.Map 的內部實現
typeMapstruct{
muMutex
//readcontainstheportionofthemap'scontentsthataresafefor
//concurrentaccess(withorwithoutmuheld).
//
//Thereadfielditselfisalwayssafetoload,butmustonlybestoredwith
//muheld.
//
//Entriesstoredinreadmaybeupdatedconcurrentlywithoutmu,butupdating
//apreviously-expungedentryrequiresthattheentrybecopiedtothedirty
//mapandunexpungedwithmuheld.
readatomic.Value//readOnly
//dirtycontainstheportionofthemap'scontentsthatrequiremutobe
//held.Toensurethatthedirtymapcanbepromotedtothereadmapquickly,
//italsoincludesallofthenon-expungedentriesinthereadmap.
//
//Expungedentriesarenotstoredinthedirtymap.Anexpungedentryinthe
//cleanmapmustbeunexpungedandaddedtothedirtymapbeforeanewvalue
//canbestoredtoit.
//
//Ifthedirtymapisnil,thenextwritetothemapwillinitializeitby
//makingashallowcopyofthecleanmap,omittingstaleentries.
dirtymap[interface{}]*entry
//missescountsthenumberofloadssincethereadmapwaslastupdatedthat
//neededtolockmutodeterminewhetherthekeywaspresent.
//
//Onceenoughmisseshaveoccurredtocoverthecostofcopyingthedirty
//map,thedirtymapwillbepromotedtothereadmap(intheunamended
//state)andthenextstoretothemapwillmakeanewdirtycopy.
missesint
}
Map.read
相當於字典的快照,支援更新和查詢操作,原子操作,不需要持有鎖。Map.dirty
是原生字典,支援增刪改查操作,所有操作需要持有鎖 mu
。
Map.read
和 Map.dirty
中儲存的鍵值都是指標,而不是基本值。
查詢鍵值對時,首先去 read
字典查詢,如果沒找到,再加鎖去 dirty
字典查詢。
儲存鍵值對時,如果 read
字典中存在這個鍵,就直接更新。如果這個鍵被標記為“已刪除”,則儲存到 dirty
字典,清除“已刪除”的標記。
刪除鍵值時,如果只讀字典中不存在該鍵值對,就直接在 dirty
字典中進行刪除。如果只讀字典中存在該鍵值對,還要對其進行邏輯刪除(標記為“已刪除”)。
在髒字典中查詢鍵值對次數足夠多的時候,sync.Map
會把髒字典直接作為只讀字典,儲存在它的 read
欄位中,然後把代表髒字典的 dirty
欄位的值置為 nil。在這之後,一旦再有新的鍵值對存入,它就會依據只讀字典去重建髒字典。這個時候,它會把只讀字典中已被邏輯刪除的鍵值對過濾掉。
總的來說,只讀字典可能只包含部分鍵值對(含邏輯刪除鍵值對),而髒字典中始終包含全量的鍵值對(不含邏輯刪除鍵值對)。
sync.Map
適用於讀多寫少的情況,如果寫資料比較頻繁可以參考:https://github.com/orcaman/concurrent-map
goroutine的協作工具
sync.WaitGroup
用於同步 goroutine 的協作流程。它可以使一個 goroutine 在其協程完成後才繼續執行後續任務。
開始使用後禁止複製。
varwgsync.WaitGroup
funcmain(){
wg.Add(3)
fori:=0;i3;i++{
godoSomething()
}
wg.Wait()
}
funcdoSomething(){
deferwg.Done()
}
Tips
- 禁止同時呼叫 WaitGroup 的 Add() 和 Wait(),即杜絕併發執行用 WaitGroup 的方法。原因是在 Wait() 執行時更改其計數器的值會引發 panic。
sync.Once
執行首次被呼叫時的入參函式,並且只執行一次。
Once() 中的 fail-fast 機制
func(o*Once)Do(ffunc()){
//Note:HereisanincorrectimplementationofDo:
//
//ifatomic.CompareAndSwapUint32(&o.done,0,1){
//f()
//}
//
//Doguaranteesthatwhenitreturns,fhasfinished.
//Thisimplementationwouldnotimplementthatguarantee:
//giventwosimultaneouscalls,thewinnerofthecaswould
//callf,andthesecondwouldreturnimmediately,without
//waitingforthefirst'scalltoftocomplete.
//Thisiswhytheslowpathfallsbacktoamutex,andwhy
//theatomic.StoreUint32mustbedelayeduntilafterfreturns.
ifatomic.LoadUint32(&o.done)==0{
//Outlinedslow-pathtoallowinliningofthefast-path.
o.doSlow(f)
}
}
func(o*Once)doSlow(ffunc()){
o.m.Lock()
defero.m.Unlock()
ifo.done==0{
deferatomic.StoreUint32(&o.done,1)
f()
}
}
由於 Once.Do()
保證在返回前 f()
已經執行完成,如果存在多個 goroutine 併發呼叫 Do(),會導致除了獲勝者,其餘 goroutine 都被阻塞在 o.m.Lock()
上。如果 f()
阻塞,可能會導致死鎖。
Once.Do()
不保證 f()
執行成功。
context.Context
funccoordinateWithContext(){
cxt,cancelFunc:=context.WithCancel(context.Background())
//啟動3個具有相關任務的協程
//如果有一個協程出現問題,取消其他協程
fori:=1;igofunc(){
r,e:=fn(ctx)
ife!=nil{
cancelFunc()
}
}
}
time.Sleep(10*time.Second)
fmt.Println("End.")
}
funcfn(ctxcontext.Context)string,error{
resp:=make(chanstring)
err:=make(chanerror)
gofunc(){
responseString,e:=doSomething()
ife!=nil{
err}else{
resp}
}()
select{
casereturn"",ctx.Err()
caser:=returnr,nil
casee:=return"",e
}
}
Context.Done()
返回一個 型別的值,這是一個接收通道。呼叫
cancelFunc()
時,該通道會關閉,阻塞的接收操作會立刻返回。
Context
型別值的撤銷操作會聯動它的子值。
Context
型別還提供了 WithDeadline()
和 WithTimeout()
方法,生成擁有生命週期的 Context
型別。
此外,Context.WithValue()
可以提供協程間的資料傳輸功能。在 Context 中查詢資料時,先在當前 Context 中查詢,如果沒找到,再去父值中查詢。不過 Context 不提供資料更新的方法,只能通過 在子值中覆蓋同名數據、或撤銷 Context 丟棄資料 間接實現。
本文作者:entelecheia
原文連結:https://www.cnblogs.com/win-for-life/p/13372984.html
推薦閱讀
面試官讓我用channel實現sync包裡的同步鎖,是不是故意為難我?
站長 polarisxu
自己的原創文章
不限於 Go 技術
職場和創業經驗
Go語言中文網
每天為你
分享 Go 知識
Go愛好者值得關注