1. 程式人生 > >golang中鎖mutex的實現

golang中鎖mutex的實現

golang中的鎖是通過CAS原子操作實現的,Mutex結構如下: type Mutex struct {     state int32                     sema  uint32 }   //state表示鎖當前狀態,每個位都有意義,零值表示未上鎖 //sema用做訊號量,通過PV操作從等待佇列中阻塞/喚醒goroutine,等待鎖的goroutine會掛到等待佇列中,並且陷入睡眠不被排程,unlock鎖時才喚醒。具體在sync/mutex.go Lock函式實現中。   插播一下sema 雖然在Mutex中就是一個整形欄位,但是它是很重要的一環,這個欄位就是用於訊號量管理goroutine的睡眠和喚醒的。 sema具體實現還沒詳看,這裡大概分析下功能,注意不準確!! 首先sema為goroutine的“排程”提供了一種實現,可以讓goroutine阻塞和喚醒 訊號量申請資源在runtime/sema.go中semacquire1 訊號量釋放資源在semrelease1中 首先sema中,一個semaRoot結構和一個全域性semtable變數,一個semaRoot用於一個訊號量的PV操作(猜測與goroutine排程模型MGP有關,一個Processor掛多個goroutine,對於一個processor下的多個goroutine的需要一個訊號量來管理,當然需要一個輕量的鎖在goroutine的狀態轉換時加鎖,即下面的lock結構,這個鎖與Mutex中的鎖不相同的,是sema中自己實現的),多個semaRoot的分配和查詢就通過全域性變數semtable來管理 type semaRoot struct {     lock  mutex     treap *sudog // root of balanced tree of unique waiters.     nwait uint32 // Number of waiters. Read w/o the lock. } var semtable [semTabSize]struct {     root semaRoot     pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte } 1 讓當前goroutine睡眠阻塞是通過goparkunlock實現的,在semacquire1中這樣呼叫:           1) root := semroot(addr)                 semroot中是通過訊號量地址找到semaRoot結構           2) 略過一段..... 直接到使當前goroutine睡眠位置                 首先lock(&root.lock)上鎖                 然後呼叫root.queue()讓當前goroutine進入等待佇列(注意一個訊號量管理多個goroutine,goroutine睡眠前,本身的詳細資訊就要儲存起來,放到佇列中,也就是在掛到了semaRoot結構的treap上,看註釋佇列是用平衡樹實現的?)           3)呼叫goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4)                  最後會呼叫到gopark,gopark會讓系統重新執行一次排程,在重新排程之前,會將當前goroutine,即G物件狀態置為sleep狀態,不再被排程直到被喚醒,然後unlock鎖,這個函式給了系統一個機會,將程式碼執行許可權轉交給runtime排程器,runtime會去排程別的goroutine。   2 既然阻塞,就需要有喚醒的機制    喚醒機制是通過semtable結構    sema.go並非專門為mutex鎖中的設計的,在mutex中使用的話,是在其它goroutine釋放Mutex時,呼叫的semrelease1,從佇列中喚醒goroutine執行。詳細沒看。    不過根據分析,Mutex是互斥鎖,Mutex中的訊號量應該是二值訊號量,只有0和1。在Mutex中呼叫Lock,假如執行到semacquire1,從中判斷訊號量如果為0,就讓當前goroutine睡眠, func cansemacquire(addr *uint32) bool {     for {         v := atomic.Load(addr)         if v == 0 {             return false         }         if atomic.Cas(addr, v, v-1) {             return true         }     } }       如果不斷有goroutine嘗試獲取Mutex鎖,都會判斷到訊號量為0,會不斷有goroutine陷入睡眠狀態。只有當unlock時,訊號量才會+1,當然不能重複執行unlock,所以這個訊號量應該只為0和1。   大概分析了下sema,轉回到Mutex中來。 上面說了sema欄位的作用,state欄位在Mutex中是更為核心的欄位,標識了當前鎖的一個狀態。 state     |31|30|....|      2    |     1      |      0     |                   |                |           |      第0位表示當前被加鎖,0,unlock,   1 locked                   |                |        是否有goroutine已被喚醒,0 喚醒, 1 沒有                   |           這一位表示當前Mutex處於什麼模式,兩種模式,0 Normal   1 Starving              第三位表示嘗試Lock這個鎖而等待的goroutine的個數   先解釋下Mutex的normal和starving兩種模式,程式碼中關於Mutex的註釋如下 兩種模式是為了鎖的公平性而實現,摘取網上的一段翻譯: http://blog.51cto.com/qiangmzsx/2134786 互斥量可分為兩種操作模式:正常和飢餓。 在正常模式下,等待的goroutines按照FIFO(先進先出)順序排隊,但是goroutine被喚醒之後並不能立即得到mutex鎖,它需要與新到達的goroutine爭奪mutex鎖。 因為新到達的goroutine已經在CPU上運行了,所以被喚醒的goroutine很大概率是爭奪mutex鎖是失敗的。出現這樣的情況時候,被喚醒的goroutine需要排隊在佇列的前面。 如果被喚醒的goroutine有超過1ms沒有獲取到mutex鎖,那麼它就會變為飢餓模式。 在飢餓模式中,mutex鎖直接從解鎖的goroutine交給佇列前面的goroutine。新達到的goroutine也不會去爭奪mutex鎖(即使沒有鎖,也不能去自旋),而是到等待佇列尾部排隊。 在飢餓模式下,有一個goroutine獲取到mutex鎖了,如果它滿足下條件中的任意一個,mutex將會切換回去正常模式: 1. 是等待佇列中的最後一個goroutine 2. 它的等待時間不超過1ms。 正常模式有更好的效能,因為goroutine可以連續多次獲得mutex鎖; 飢餓模式對於預防佇列尾部goroutine一致無法獲取mutex鎖的問題。   具體實現如下: 在Lock函式中     // Fast path: grab unlocked mutex.     // 1  使用原子操作修改鎖狀態為locked     if atomic.CompareAndSwapInt32(&m.state
, 0, mutexLocked) {         if race.Enabled {             race.Acquire(unsafe.Pointer(m))         }            return     }    Mutex多個goroutine在任何時機都會嘗試去獲取,Mutex的state又實時在變化,各種場景有點多,這裡挑典型的來說。 1) 假設當前mutex處於初始狀態,即m.state=0,那麼當前goroutine會在這裡會直接獲取到鎖,m.state變為locked, 則m.state = 00...001     上鎖了,Not Woken, normal狀態。        運氣好,一來就獲取到,就跟上面說的一樣,來時就在cpu裡,又趕上鎖沒人佔,天生自帶光環,呵呵。       Lock結束return         如果這個goroutine不釋放鎖,那麼然後再來一個goroutine就鎖不上了,進入第二步   2) 緊接著一個for迴圈,大概就是嘗試獲取鎖,求而不得,就睡一會吧,等著被叫醒,醒了看看是不是等的時間太長餓了,餓了就進入starving,starving就會被優先排程了,沒有那運氣,就只能等了。     var waitStartTime int64     starving := false     awoke := false     iter := 0     old := m.state    //剛才已經設定m.state=001,old也為001     for {         // Don't spin in starvation mode, ownership is handed off to waiters         // so we won't be able to acquire the mutex anyway.         // old=001,鎖著呢
        // 然後runtime_canSpin看看能不能自旋啊,就是看傳進來的iter,每次迴圈都是自增         // 自旋條件:多核,GOMAXPROCS>1,至少有另外一個執行的P並且本地佇列不空。或許是害怕單核自旋,程式都停了。另外最多自旋4次,iter為4時不會再進if                              我們這裡考慮多核的情況,會進if         // old在每次if中會重新獲取,這裡自旋的目的就是等待鎖釋放,當前佔用cpu的goroutine就可以佔了,go裡面總是儘量讓在cpu中的goroutine佔用鎖         if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {             // Active spinning makes sense.             // Try to set mutexWoken flag to inform Unlock             // to not wake other blocked goroutines.             // 當前awoke為false,但是沒有goroutine在等待,那麼unlock時,沒必要喚醒佇列goroutine。
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {                 awoke = true             }                runtime_doSpin()     //自旋,執行沒用的指令30次             iter++             old = m.state           //old重新獲取一次state值,如果有其它goroutine釋放了,那麼下次迴圈就不進if了             continue                   //自旋完再迴圈一次         }            //if出來後,會有兩種情況         2.1)其它goroutine  unlock了,上面if判斷非Locked跳出,此時 m.state=000, old=000, awoke=false, 沒有goroutine在等待,這是最簡單的情況了         new := old                  //new=000,   old=000,  m.state=000,  awoke=false,這裡初始化new,後面要設定鎖狀態,m.state設定為new         // Don't try to acquire starving mutex, new arriving goroutines must queue.         if old&mutexStarving == 0 {      //new=000, 當前鎖並不是starving模式,正在執行的goroutine要佔用這個鎖,如果是starving模式,當前的goroutine要去排隊,把鎖讓給佇列中快餓死的兄弟                  new |= mutexLocked              //new=001, 要上鎖         }            if old&(mutexLocked|mutexStarving) != 0 {       //old=000, 當前正在跑的這個goroutine要佔鎖,不會進佇列, new=001             new += 1 << mutexWaiterShift         }            // The current goroutine switches mutex to starvation mode.         // But if the mutex is currently unlocked, don't do the switch.         // Unlock expects that starving mutex has waiters, which will not         // be true in this case.         if starving && old&mutexLocked != 0 {             //starving=false,只有goroutine在unlock喚醒後,發現等待時間過長,starving才設定為true,因為佇列中其它的goroutine都等的有點長了,所以在鎖可用時,優先給佇列中的goroutine。這個邏輯在後面,當前不進這個if,new=001             new |= mutexStarving         }            if awoke {                       //awoke為false,不去喚醒等待佇列, new仍為001             // The goroutine has been woken from sleep,             // so we need to reset the flag in either case.             if new&mutexWoken == 0 {                 throw("sync: inconsistent mutex state")             }                new &^= mutexWoken         }             至此new初始化完畢,new=001,要去更改Mutex的鎖狀態,真正獨佔鎖了           //保險起見,以防在new設定過程中,有其它goroutine更改了鎖狀態,原子性的設定當前鎖狀態為new=001,這裡就是上鎖           if atomic.CompareAndSwapInt32(&m.state, old, new) {                     if old&(mutexLocked|mutexStarving) == 0 {                           //old=000,直接break,因為上面是將m.state置為上鎖,已經成功了,至此後面邏輯不走了                 break // locked the mutex with CAS                                    //回頭看2.1,我們如果是自旋次數夠了跳出呢?如2.2邏輯             }                // If we were already waiting before, queue at the front of the queue.             queueLifo := waitStartTime != 0             if waitStartTime == 0 {                 waitStartTime = runtime_nanotime()             }                runtime_SemacquireMutex(&m.sema, queueLifo)             starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs             old = m.state             if old&mutexStarving != 0 {                 // If this goroutine was woken and mutex is in starvation mode,                 // ownership was handed off to us but mutex is in somewhat                 // inconsistent state: mutexLocked is not set and we are still                 // accounted as waiter. Fix that.                 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {                     throw("sync: inconsistent mutex state")                 }                    delta := int32(mutexLocked - 1<<mutexWaiterShift)                 if !starving || old>>mutexWaiterShift == 1 {                     // Exit starvation mode.                     // Critical to do it here and consider wait time.                     // Starvation mode is so inefficient, that two goroutines                     // can go lock-step infinitely once they switch mutex                     // to starvation mode.                     delta -= mutexStarving                 }                 atomic.AddInt32(&m.state, delta)                 break             }             awoke = true             iter = 0         } else {             old = m.state         }          2.2)new := old,    此時new=001, old=001, m.state=001, awoke=false (awoke在if中設定為true的情況就不討論了,太多了。。。。)         // Don't try to acquire starving mutex, new arriving goroutines must queue.         if old&mutexStarving == 0 {             new |= mutexLocked                    //new=001         }         if old&(mutexLocked|mutexStarving) != 0 {    //old=001, 當前跑的這個goroutine要進佇列,new的第3位到第31位表示佇列中goroutine數量,這裡+1             new += 1 << mutexWaiterShift                  //new=1001         }         // The current goroutine switches mutex to starvation mode.         // But if the mutex is currently unlocked, don't do the switch.         // Unlock expects that starving mutex has waiters, which will not         // be true in this case.         if starving && old&mutexLocked != 0 {        //starving=false,並不需要進入starving模式             new |= mutexStarving         }         if awoke {                                                      //awoke=false             // The goroutine has been woken from sleep,             // so we need to reset the flag in either case.             if new&mutexWoken == 0 {                 throw("sync: inconsistent mutex state")             }             new &^= mutexWoken         }               new初始化為1001, old=001         if atomic.CompareAndSwapInt32(&m.state, old, new) {             if old&(mutexLocked|mutexStarving) == 0 {                 //old=001,這裡不會break,因為當前的goroutine拿不到鎖需要阻塞睡眠                 break // locked the mutex with CAS             }             // If we were already waiting before, queue at the front of the queue.             queueLifo := waitStartTime != 0                                      //判斷當前goroutine是不是for迴圈第一次走到這裡,是的話,waitStartTime=0             if waitStartTime == 0 {                                                    //queueLifo的true還是false決定了goroutine入佇列時,是排隊還是插到隊頭                 waitStartTime = runtime_nanotime()             }             runtime_SemacquireMutex(&m.sema, queueLifo)          //當前goroutine入等待佇列, 跳到 “註腳1”,更多說明。此時goroutine會阻塞在這,鎖釋放,如果在隊頭,才會被喚醒。             starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs    //喚醒時判斷是否等待時間過長,超過了1ms,就設定starving為true,“註腳2”更多說明             old = m.state             if old&mutexStarving != 0 {                 // If this goroutine was woken and mutex is in starvation mode,                 // ownership was handed off to us but mutex is in somewhat                 // inconsistent state: mutexLocked is not set and we are still                 // accounted as waiter. Fix that.                 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {                     throw("sync: inconsistent mutex state")                 }                 delta := int32(mutexLocked - 1<<mutexWaiterShift)                 if !starving || old>>mutexWaiterShift == 1 {                     // Exit starvation mode.                     // Critical to do it here and consider wait time.                     // Starvation mode is so inefficient, that two goroutines                     // can go lock-step infinitely once they switch mutex                     // to starvation mode.                     delta -= mutexStarving                 }                 atomic.AddInt32(&m.state, delta)                 break             }             awoke = true             iter = 0         } else {             old = m.state         }   註腳1    這的runtime_SemacquireMutex是對上面說的sema.go中semacquire1的簡單封裝,裡面最後會呼叫goPark讓當前goroutine讓出執行許可權給runtime,同時設定當前goroutine為睡眠狀態,不參與排程(表現在程式上,就是阻在那了)。   註腳2    1) 這也分兩種情況,如果沒有超1ms,starving=false                      old = m.state              //當前肯定是unlock了,當前goroutine才被喚醒了,所以old至少為000,我們假定為000                      if old&mutexStarving != 0    //old不是starving模式,不進if                            awoke = true    //充置awoke和iter,重新走迴圈                     iter = 0                      ///////////////////////////                      下次迴圈中,最後會設定new=001,當前goroutine被喚醒,加鎖1,不是starving狀態。                      最後會在下面這break,跳出Lock函式         if atomic.CompareAndSwapInt32(&m.state, old, new) {             if old&(mutexLocked|mutexStarving) == 0 {                 break // locked the mutex with CAS             }                 2)如果超了1ms,straving = true                  old = m.state              //當前肯定是unlock了,當前goroutine才被喚醒了,所以old至少為000,我們假定為000                    if old&mutexStarving != 0    //old不是starving模式,不進if                      awoke = true    //充置awoke和iter,重新走迴圈                    iter = 0                    ///////////////////////////                  下次迴圈 new=101, 鎖處於starving模式,當前goroutine被喚醒,已加鎖       二  如果處於starving會有什麼影響?主要提現在Unlock函式中     // Fast path: drop lock bit.     //先清掉lock位,假設最簡單的情況,其它位都為0,則m.state=000, new=000     new := atomic.AddInt32(&m.state, -mutexLocked)     if (new+mutexLocked)&mutexLocked == 0 {         throw("sync: unlock of unlocked mutex")     }       //這裡就是starving模式的影響,如果處於starving模式,那麼直接走else,從佇列頭部喚醒一個goroutine。     if new&mutexStarving == 0 {         old := new                   //old = 000         for {             // If there are no waiters or a goroutine has already             // been woken or grabbed the lock, no need to wake anyone.             // In starvation mode ownership is directly handed off from unlocking             // goroutine to the next waiter. We are not part of this chain,             // since we did not observe mutexStarving when we unlocked the mutex above.             // So get off the way.             //如果佇列中沒有等待的goroutine或者有goroutine已經被喚醒並且搶佔了鎖(這種情況就如lock中,正好處在cpu中的goroutine在自旋,正好在unlock後,馬上搶佔了鎖),那麼就不需要wake等待隊列了。             if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {                 return             }                          //如果佇列中有等著的,並且也沒有處在cpu中的goroutine去自旋獲取鎖,那麼就抓住機會從等待佇列中喚醒一個goroutine。             // Grab the right to wake someone.             new = (old - 1<<mutexWaiterShift) | mutexWoken             if atomic.CompareAndSwapInt32(&m.state, old, new) {                 runtime_Semrelease(&m.sema, false)                 return             }             old = m.state         }     } else {         // Starving mode: handoff mutex ownership to the next waiter.         // Note: mutexLocked is not set, the waiter will set it after wakeup.         // But mutex is still considered locked if mutexStarving is set,         // so new coming goroutines won't acquire it.           //starving模式,直接從佇列頭取goroutine喚醒。上面lock函式中沒有分析runtime_SemacquireMutex(&m.sema, queueLifo)阻塞被喚醒後,如果lock處於是starving模式,會怎麼樣,這裡分析一下,註腳3         runtime_Semrelease(&m.sema, true)     }   註腳3  首先在unlock函式開頭即使清了lock位,cpu中的goroutine也不能獲取到鎖(因為判斷m.state的starving位是飢餓模式,只能佇列中等待的goroutine取獲取鎖,所以cpu中的goroutine會進入等待佇列),那麼在unlock函式中runtime_Semrelease(&m.sema, true)時,會喚醒佇列中一個睡眠的goroutine。 回到lock函式中,此時m.state應為100               runtime_SemacquireMutex(&m.sema, queueLifo)    //在這被喚醒             starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs             old = m.state            //old = 100             if old&mutexStarving != 0 {          //lock處於starving中                 // If this goroutine was woken and mutex is in starvation mode,                 // ownership was handed off to us but mutex is in somewhat                 // inconsistent state: mutexLocked is not set and we are still                 // accounted as waiter. Fix that.                 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {                     throw("sync: inconsistent mutex state")                 }                 delta := int32(mutexLocked - 1<<mutexWaiterShift)                //先將當前等待佇列減一個                 if !starving || old>>mutexWaiterShift == 1 {                              //如果當前佇列空了,就把starving清0了                     // Exit starvation mode.                     // Critical to do it here and consider wait time.                     // Starvation mode is so inefficient, that two goroutines                     // can go lock-step infinitely once they switch mutex                     // to starvation mode.                     delta -= mutexStarving                 }                 atomic.AddInt32(&m.state, delta)      //加鎖跳出                 break             } 總結:這裡只簡單說了下互斥鎖,另外還有讀寫鎖,不做贅述。互斥鎖是在原子操作atomic之上實現的,後面會再詳細寫下原子操作。 這裡先說幾個有意思的問題,答案不一定正確,希望大佬指正。 1  一個全域性int變數,多核中一個goroutine讀,一個寫,沒有更多操作,需不需要做原子操作。    應該是不需要加的,intel P6處理器在硬體層面上是支援32位變數的load和store的原子性的。另外編譯器對於變數的讀或寫也不會編譯成多條指令。   2   一個全域性int變數i, 對於多核,兩個協程都同時執行i++,需要原子操作嗎?     需要的,對於i++,是典型的讀改寫操作,對於這樣的操作,需要CAS原子操作保證原子性。   3  對於一個map,寫加原子操作,讀要不要加     如果只是讀或者寫,並且值型別是整形的,應該是不需要atomic原子操作的,這裡的意思是對於整形,不會出現寫一半,或者讀一半的情況,但是不可避免的,會出現這種情況,goroutine1對map寫入1,goroutine2讀到1,在處理的過程中,goroutine1又重新賦值。