1. 程式人生 > >Go語言GC實現原理及原始碼分析

Go語言GC實現原理及原始碼分析

> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com/archives/475 > > 本文使用的 Go 的原始碼1.15.7 ## 介紹 ### 三色標記法 三色標記法將物件的顏色分為了黑、灰、白,三種顏色。 - 黑色:該物件已經被標記過了,且該物件下的屬性也全部都被標記過了(程式所需要的物件); - 灰色:該物件已經被標記過了,但該物件下的屬性沒有全被標記完(GC需要從此物件中去尋找垃圾); - 白色:該物件沒有被標記過(物件垃圾); 在垃圾收集器開始工作時,從 GC Roots 開始進行遍歷訪問,訪問步驟可以分為下面幾步: 1. GC Roots 根物件會被標記成灰色; 2. 然後從灰色集合中獲取物件,將其標記為黑色,將該物件引用到的物件標記為灰色; 3. 重複步驟2,直到沒有灰色集合可以標記為止; 4. 結束後,剩下的沒有被標記的白色物件即為 GC Roots 不可達,可以進行回收。 流程大概如下: ![TRI-COLOR](https://img.luozhiyun.com/20210320234246.png) 下面我們來說說三色標記法會存在的問題。 ### 三色標記法所存在問題 #### 多標-浮動垃圾問題 假設 E 已經被標記過了(變成灰色了),此時 D 和 E 斷開了引用,按理來說物件 E/F/G 應該被回收的,但是因為 E 已經變為灰色了,其仍會被當作存活物件繼續遍歷下去。最終的結果是:這部分物件仍會被標記為存活,即本輪 GC 不會回收這部分記憶體。 這部分本應該回收 但是沒有回收到的記憶體,被稱之為“浮動垃圾”。過程如下圖所示: ![TRI-COLOR2](https://img.luozhiyun.com/20210320234259.png) #### 漏標-懸掛指標問題 除了上面多標的問題,還有就是漏標問題。當 GC 執行緒已經遍歷到 E 變成灰色,D變成黑色時,灰色 E 斷開引用白色 G ,黑色 D 引用了白色 G。此時切回 GC 執行緒繼續跑,因為 E 已經沒有對 G 的引用了,所以不會將 G 放到灰色集合。儘管因為 D 重新引用了 G,但因為 D 已經是黑色了,不會再重新做遍歷處理。 最終導致的結果是:G 會一直停留在白色集合中,最後被當作垃圾進行清除。這直接影響到了應用程式的正確性,是不可接受的,這也是 Go 需要在 GC 時解決的問題。 ![TRI-COLOR3](https://img.luozhiyun.com/20210320234309.png) ### 記憶體屏障 為了**解決**上面的懸掛指標問題,我們需要引入屏障技術來保障資料的一致性。 > A **memory barrier**, is a type of barrier instruction that causes a central processing unit (CPU) or compiler to enforce an ordering constraint on memoryoperations issued before and after the barrier instruction. This typically means that operations issued prior to the barrier are guaranteed to be performed before operations issued after the barrier. 記憶體屏障,是一種屏障指令,它能使CPU或編譯器對在該屏障指令之前和之後發出的記憶體操作強制執行**排序約束**,在記憶體屏障前執行的操作一定會先於記憶體屏障後執行的操作。 那麼為了在標記演算法中保證正確性,那麼我們需要達成下面任意一個條件: * 強三色不變性(strong tri-color invariant):黑色物件不會指向白色物件,只會指向灰色物件或者黑色物件; * 弱三色不變性(weak tri-color invariant):即便黑色物件指向白色物件,那麼從灰色物件出發,總存在一條可以找到該白色物件的路徑; 根據操作型別的不同,我們可以將記憶體屏障分成 Read barrier(讀屏障)和 Write barrier(寫屏障)兩種,在 Go 中都是使用 Write barrier(寫屏障),原因在《Uniprocessor Garbage Collection Techniques》也提到了: > If a non copying collector is used the use of a read barrier is an unnecessary expense.there is no need to protect the mutator from seeing an invalid version of a pointer. Write barrier techniques are cheaper, because heap writes are several times less common than heap reads 對於一個不需要物件拷貝的垃圾回收器來說, Read barrier(讀屏障)代價是很高的,因為對於這類垃圾回收器來說是不需要儲存讀操作的版本指標問題。相對來說 Write barrier(寫屏障)程式碼更小,因為堆中的寫操作遠遠小於堆中的讀操作。 來下面我們看看 Write barrier(寫屏障)是如何做到這一點的。 #### Dijkstra Write barrier Go 1.7 之前使用的是 Dijkstra Write barrier(寫屏障),使用的實現類似下面虛擬碼: ```go writePointer(slot, ptr): shade(ptr) *slot = ptr ``` 如果該物件是白色的話,`shade(ptr)`會將物件標記成灰色。這樣可以保證強三色不變性,它會保證 ptr 指標指向的物件在賦值給 `*slot` 前不是白色。 如下,根物件指向的 D 物件標記成黑色並將 D 物件指向的物件 E 標記成灰色;如果 D 斷開對 E 的引用,改成引用 B 物件,那麼這時觸發寫屏障將 B 物件標記成灰色。 ![TRI-COLOR4](https://img.luozhiyun.com/20210320234314.png) Dijkstra Write barrier雖然實現非常的簡單,並且也能保證強三色不變性,但是在《Proposal: Eliminate STW stack re-scanning》中也提出了它具有一些缺點: > In particular, it presents a trade-off for pointers on stacks: either writes to pointers on the stack must have write barriers, which is prohibitively expensive, or stacks must be permagrey. 因為棧上的物件在垃圾收集中也會被認為是根物件,所以要麼為棧上的物件增加寫屏障,但這會大幅度增加寫入指標的額外開銷;要麼當發生棧上的寫操作時,將棧標記為恆灰(permagrey)。 Go 1.7 的時候選擇的是將棧標記為恆灰,但需要在標記終止階段 STW 時對這些棧進行重新掃描(re-scan)。原因如下所描述: > without stack write barriers, we can‘t ensure that the stack won’t later contain a reference to a white object, so a scanned stack is only black until its goroutine executes again, at which point it conservatively reverts to grey. Thus, at the end of the cycle, the garbage collector must re-scan grey stacks to blacken them and finish marking any remaining heap pointers. #### Yuasa Write barrier Yuasa Write barrier 是 Yuasa 在《Real-time garbage collection on general-purpose machines》中提出的一種刪除屏障(deletion barrier)技術。其思想是當賦值器從灰色或白色物件中刪除白色指標時,通過寫屏障將這一行為通知給併發執行的回收器。 該演算法會使用如下所示的寫屏障保證增量或者併發執行垃圾收集時程式的正確性,虛擬碼實現如下: ``` writePointer(slot, ptr) shade(*slot) *slot = ptr ``` 為了防止丟失從灰色物件到白色物件的路徑,應該假設 *slot 可能會變為黑色, 為了確保 ptr 不會在被賦值到 *slot 前變為白色,shade(*slot) 會先將 *slot 標記為灰色, 進而該寫操作總是創造了一條灰色到灰色或者灰色到白色物件的路徑,這樣刪除寫屏障就可以保證弱三色不變性,老物件引用的下游物件一定可以被灰色物件引用。 ![YuasaWritebarrier](https://img.luozhiyun.com/20210320234320.png) #### Hybrid write barrier 上面說了在 Go 1.7 之前使用的是 Dijkstra Write barrier(寫屏障)來保證三色不變性。Go 在重新掃描的時候必須保證物件的引用不會改變,因此會進行暫停程式(STW)、將所有棧物件標記為灰色並重新掃描,這通常會消耗10~100 毫秒的時間。 通過 Proposal: Eliminate STW stack re-scanning https://go.googlesource.com/proposal/+/master/design/17503-eliminate-rescan.md 的介紹,可以知道為了消除重新掃描所帶來的效能損耗,Go 在 1.8 的時候使用 Hybrid write barrier(混合寫屏障),結合了 Yuasa write barrier 和 Dijkstra write barrier ,實現的虛擬碼如下: ``` writePointer(slot, ptr): shade(*slot) if current stack is grey: shade(ptr) *slot = ptr ``` 這樣做不僅簡化 GC 的流程,同時減少標記終止階段的重掃成本。混合寫屏障的基本思想是: > the write barrier shades the object whose reference is being overwritten, and, if the current goroutine's stack has not yet been scanned, also shades the reference being installed. 翻譯過來就是:對正在被覆蓋的物件進行著色,且如果當前棧未掃描完成, 則同樣對指標進行著色。 同時,在GC的過程中所有新分配的物件都會立刻變為黑色,在記憶體分配的時候 `go\src\runtime\malloc.go` 的 mallocgc 函式中可以看到: ```go func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer { ... dataSize := size // 獲取mcache,用於處理微物件和小物件的分配 c := gomcache() var x unsafe.Pointer // 表示物件是否包含指標,true表示物件裡沒有指標 noscan := typ == nil || typ.ptrdata == 0 // maxSmallSize=32768 32k if size <= maxSmallSize { // maxTinySize= 16 bytes if noscan && size < maxTinySize { ... } else { ... } // 大於 32 Kb 的記憶體分配,通過 mheap 分配 } else { ... } ... // 在 GC 期間分配的新物件都會被標記成黑色 if gcphase != _GCoff { gcmarknewobject(span, uintptr(x), size, scanSize) } ... return x } ``` 在垃圾收集的標記階段,將新建的物件標記成黑色,防止新分配的棧記憶體和堆記憶體中的物件被錯誤地回收。 ## 分析 ### GC phase 垃圾收集階段 GC 相關的程式碼在`runtime/mgc.go`檔案下。通過註釋介紹我們可以知道 GC 一共分為4個階段: 1. sweep termination(清理終止) 1. 會觸發 STW ,所有的 P(處理器) 都會進入 safe-point(安全點); 2. 清理未被清理的 span ,不知道什麼是 span 的同學可以看看我的:詳解Go中記憶體分配原始碼實現 https://www.luozhiyun.com/archives/434; 2. the mark phase(標記階段) 1. 將 `_GCoff` GC 狀態 改成 `_GCmark`,開啟 Write Barrier (寫入屏障)、mutator assists(協助執行緒),將根物件入隊; 2. 恢復程式執行,mark workers(標記程序)和 mutator assists(協助執行緒)會開始併發標記記憶體中的物件。對於任何指標寫入和新的指標值,都會被寫屏障覆蓋,而所有新建立的物件都會被直接標記成黑色; 3. GC 執行根節點的標記,這包括掃描所有的棧、全域性物件以及不在堆中的執行時資料結構。掃描goroutine 棧繪導致 goroutine 停止,並對棧上找到的所有指標加置灰,然後繼續執行 goroutine。 4. GC 在遍歷灰色物件佇列的時候,會將灰色物件變成黑色,並將該物件指向的物件置灰; 5. GC 會使用分散式終止演算法(distributed termination algorithm)來檢測何時不再有根標記作業或灰色物件,如果沒有了 GC 會轉為mark termination(標記終止); 3. mark termination(標記終止) 1. STW,然後將 GC 階段轉為 `_GCmarktermination`,關閉 GC 工作執行緒以及 mutator assists(協助執行緒); 2. 執行清理,如 flush mcache; 4. the sweep phase(清理階段) 1. 將 GC 狀態轉變至 `_GCoff`,初始化清理狀態並關閉 Write Barrier(寫入屏障); 2. 恢復程式執行,從此開始新建立的物件都是白色的; 3. 後臺併發清理所有的記憶體管理單元 需要注意的是,上面提到了 mutator assists,因為有一種情況: >
during the collection that the Goroutine dedicated to GC will not finish the Marking work before the heap memory in-use reaches its limit 因為 GC 標記的工作是分配 25% 的 CPU 來進行 GC 操作,所以有可能 GC 的標記工作執行緒比應用程式的分配記憶體慢,導致永遠標記不完,那麼這個時候就需要應用程式的執行緒來協助完成標記工作: > If the collector determines that it needs to slow down allocations, it will recruit the application Goroutines to assist with the Marking work. This is called a Mutator Assist. One positive side effect of Mutator Assist is that it helps to finish the collection faster. ### 下次 GC 時機 下次 GC 的時機可以通過一個環境變數 GOGC 來控制,預設是 100 ,即增長 100% 的堆記憶體才會觸發 GC。 >
This value represents a ratio of how much new heap memory can be allocated before the next collection has to start. 官方的解釋是,如果當前使用了 4M 記憶體,那麼下次 GC 將會在記憶體達到 8M 的時候。下面我們看看一個具體的例子: ```go package main import ( "fmt" ) func main() { fmt.Println("start.") container := make([]int, 8) fmt.Println(">
loop.") for i := 0; i < 32*1000*1000; i++ { container = append(container, i) } fmt.Println("< loop.") } ``` 需要注意的是,大家在做實驗的時候推薦使用 Linux 環境,如果沒有 Linux 環境可以像我一樣在 win10 下跑了一個虛擬機器,然後用 vscode 遠端到 Linux 進行實驗的,大家不妨試一下。 編譯好之後,可以使用 gctrace 跟蹤 GC 情況: ``` [root@localhost gotest]# go build main.go [root@localhost gotest]# GODEBUG=gctrace=1 ./main start. > loop. gc 1 @0.004s 4%: 0.22+1.4+0.021 ms clock, 1.7+0.009/0.40/0.073+0.16 ms cpu, 4->5->1 MB, 5 MB goal, 8 P gc 2 @0.006s 4%: 0.10+1.6+0.020 ms clock, 0.83+0.12/0/0+0.16 ms cpu, 4->6->1 MB, 5 MB goal, 8 P gc 3 @0.009s 16%: 0.035+5.5+2.2 ms clock, 0.28+0/0.47/0.007+18 ms cpu, 4->15->15 MB, 5 MB goal, 8 P ... < loop. ``` 上面展示了 3 次 GC 的情況,下面我們看看具體的含義是什麼: ``` gc 1 @0.004s 4%: 0.22+1.4+0.021 ms clock, 1.7+0.009/0.40/0.073+0.16 ms cpu, 4->5->1 MB, 5 MB goal, 8 P gc 1 :程式啟動以來第1次GC @0.004s:距離程式啟動到現在的時間 4%:當目前為止,GC 的標記工作所用的CPU時間佔總CPU的百分比 垃圾回收的時間 0.22 ms:標記開始 STW 時間 1.4 ms:併發標記時間 0.021 ms:標記終止 STW 時間 垃圾回收佔用cpu時間 1.7 ms:標記開始 STW 時間 0.009 ms:mutator assists佔用的時間 0.40 ms:標記執行緒佔用的時間 0.073 ms:idle mark workers佔用的時間 0.16 ms:標記終止 STW 時間 記憶體 4 MB:標記開始前堆佔用大小 5 MB:標記結束後堆佔用大小 1 MB:標記完成後存活堆的大小 5 MB goal:標記完成後正在使用的堆記憶體的目標大小 8 P:使用了多少處理器 ``` ![GCTrace](https://img.luozhiyun.com/20210320234328.png) 從上面的 GC 記憶體資訊中可以看到,在 GC 標記開始之前的時候堆大小是 4MB,由於標記工作是併發進行的,所以當標記完成的時候堆中被使用的大小是 5MB,這表示有 1MB 的記憶體分配是發生在 GC 期間。最後我們可以看到 GC 標記完之後存活的堆大小隻有 1MB,這也表示可以在堆佔用記憶體達到 2MB 的時候開始下一輪 GC。 從上面我們可以看到 Goal 部分的記憶體大小是 5MB,和實際的 In-Use After 部分的記憶體佔用情況相等,但是在很多複雜的情況下是不相等的,因為 Goal 部分的記憶體大小是基於當前記憶體的使用情況進行推算的。 > the goal is calculated based on the current amount of the heap memory in-use, the amount of heap memory marked as live, and timing calculations about the additional allocations that will occur while the collection is running. ### 觸發 GC 條件 觸發 GC 條件是由 `gcTrigger.test`來進行校驗的,下面我們看看 `gcTrigger.test`如何判定是否需要觸發垃圾收集: ```go func (t gcTrigger) test() bool { if !memstats.enablegc || panicking != 0 || gcphase != _GCoff { return false } switch t.kind { case gcTriggerHeap: // 堆記憶體的分配達到達控制器計算的觸發堆大小 return memstats.heap_live >= memstats.gc_trigger case gcTriggerTime: if gcpercent < 0 { return false } lastgc := int64(atomic.Load64(&memstats.last_gc_nanotime)) // 如果一定時間內沒有觸發,就會觸發新的迴圈 return lastgc != 0 && t.now-lastgc > forcegcperiod case gcTriggerCycle: // 要求啟動新一輪的GC, 已啟動則跳過 return int32(t.n-work.cycles) > 0 } return true } ``` gcTriggerTime 的觸發時間是由 forcegcperiod 決定的,預設是2分鐘。下面我們主要看看堆記憶體大小觸發 GC 的情況。 gcTriggerHeap 堆記憶體的分配達到達控制器計算的觸發堆大小,heap_live 值會在記憶體分配的時候進行計算,gc_trigger 的計算是在 `runtime.gcSetTriggerRatio`函式中進行的。 ```go func gcSetTriggerRatio(triggerRatio float64) { // gcpercent 由環境變數 GOGC 決定 if gcpercent >= 0 { // 預設是 1 scalingFactor := float64(gcpercent) / 100 // 最大的 maxTriggerRatio 是 0.95 maxTriggerRatio := 0.95 * scalingFactor if triggerRatio > maxTriggerRatio { triggerRatio = maxTriggerRatio } // 最大的 minTriggerRatio 是 0.6 minTriggerRatio := 0.6 * scalingFactor if triggerRatio < minTriggerRatio { triggerRatio = minTriggerRatio } } else if triggerRatio < 0 { triggerRatio = 0 } memstats.triggerRatio = triggerRatio trigger := ^uint64(0) if gcpercent >= 0 { // 當前標記存活的大小乘以1+係數triggerRatio trigger = uint64(float64(memstats.heap_marked) * (1 + triggerRatio)) ... } memstats.gc_trigger = trigger ... } ``` gcSetTriggerRatio 函式會根據計算出來的 triggerRatio 來獲取下次觸發 GC 的堆大小是多少。triggerRatio 每次GC後都會調整,計算 triggerRatio 的函式是 `gcControllerState.endCycle`中進行的,`gcControllerState.endCycle` 會在 MarkDone 中被呼叫的。 ```go func (c *gcControllerState) endCycle() float64 { const triggerGain = 0.5 // 目標Heap增長率 = (下次 GC 完後堆大小 - 堆存活大小)/ 堆存活大小 goalGrowthRatio := float64(memstats.next_gc-memstats.heap_marked) / float64(memstats.heap_marked) // 實際Heap增長率, 等於總大小/存活大小-1 actualGrowthRatio := float64(memstats.heap_live)/float64(memstats.heap_marked) - 1 // GC標記階段的使用時間 assistDuration := nanotime() - c.markStartTime // GC標記階段的CPU佔用率, 目標值是0.25 utilization := gcBackgroundUtilization // Add assist utilization; avoid divide by zero. if assistDuration > 0 { // assistTime 是G輔助GC標記物件所使用的時間合計 // 額外的CPU佔用率 = 輔助GC標記物件的總時間 / (GC標記使用時間 * P的數量)// 額外的CPU佔用率 = 輔助GC標記物件的總時間 / (GC標記使用時間 * P的數量) utilization += float64(c.assistTime) / float64(assistDuration*int64(gomaxprocs)) } // 觸發係數偏移值 = 目標增長率 - 原觸發係數 - CPU佔用率 / 目標CPU佔用率 * (實際增長率 - 原觸發係數) triggerError := goalGrowthRatio - memstats.triggerRatio - utilization/gcGoalUtilization*(actualGrowthRatio-memstats.triggerRatio) // 根據偏移值調整觸發係數, 每次只調整偏移值的一半 triggerRatio := memstats.triggerRatio + triggerGain*triggerError return triggerRatio } ``` 對於 triggerRatio 總體來說還是比較複雜的,我們可以根據偏離值來得知: * 實際增長率越大, 觸發係數偏移值越小, 小於0時下次觸發GC會提早; * CPU佔用率越大, 觸發係數偏移值越小, 小於0時下次觸發GC會提早; * 原觸發係數越大, 觸發係數偏移值越小, 小於0時下次觸發GC會提早; 通過上面的分析,也解釋了為什麼在 `GODEBUG=gctrace=1`分析中明明堆記憶體還沒達到 2倍卻被提前執行了,主要還是受 triggerError 偏移量的影響導致的。 ### 開始 GC 我們在測試的時候可以呼叫 `runtime.GC`來手動的觸發 GC。但實際上,觸發 GC 的入口一般不會手動呼叫。正常觸發 GC 應該是在申請記憶體時會呼叫 `runtime.mallocgc`或者是 Go 後臺的監控執行緒 sysmon 定時檢查呼叫 `runtime.forcegchelper`。 ```go func GC() { // 獲取 GC 迴圈次數 n := atomic.Load(&work.cycles) // 等待上一個迴圈的標記終止、標記和清除終止階段完成 gcWaitOnMark(n) // 觸發新一輪的 GC gcStart(gcTrigger{kind: gcTriggerCycle, n: n + 1}) // 同上 gcWaitOnMark(n + 1) // 等待清理全部待處理的記憶體管理單元 for atomic.Load(&work.cycles) == n+1 && sweepone() != ^uintptr(0) { sweep.nbgsweep++ // 讓出 P Gosched() } for atomic.Load(&work.cycles) == n+1 && atomic.Load(&mheap_.sweepers) != 0 { Gosched() } mp := acquirem() cycle := atomic.Load(&work.cycles) if cycle == n+1 || (gcphase == _GCmark && cycle == n+2) { // 將該階段的堆記憶體狀態快照發布出來( heap profile) mProf_PostSweep() } releasem(mp) } ``` 1. 首先會獲取 GC 的迴圈次數,然後呼叫 gcWaitOnMark 等待上一個迴圈的標記終止、標記和清除終止階段完成; 2. 呼叫 gcStart 觸發新一輪的 GC,並且會呼叫 gcWaitOnMark 等待當前的迴圈的標記終止、標記和清除終止階段完成; 3. 呼叫 sweepone 等待清理全部待處理的記憶體管理單元,然後呼叫 Gosched 讓出 P; 4. 完成本輪垃圾收集的清理工作後,呼叫 mProf_PostSweep 將該階段的堆記憶體狀態快照發布出來; ### GC 啟動 下圖是比較完整的GC流程,可作為看原始碼時候的導航: ![GC Start](https://img.luozhiyun.com/20210320234335.png) gcStart 函式比較長,下面分段來看看 gcStart: ```go func gcStart(trigger gcTrigger) { ... // 驗證垃圾收集條件 ,並清理已經被標記的記憶體單元 for trigger.test() && sweepone() != ^uintptr(0) { sweep.nbgsweep++ } // 獲取全域性的 startSema訊號量 semacquire(&work.startSema) // 再次驗證垃圾收集條件 if !trigger.test() { semrelease(&work.startSema) return } // 檢查是不是手動呼叫了 runtime.GC work.userForced = trigger.kind == gcTriggerCycle semacquire(&gcsema) semacquire(&worldsema) // 啟動後臺標記任務 gcBgMarkStartWorkers() // 重置標記相關的狀態 systemstack(gcResetMarkState) // work 初始化工作 work.stwprocs, work.maxprocs = gomaxprocs, gomaxprocs if work.stwprocs > ncpu { work.stwprocs = ncpu } work.heap0 = atomic.Load64(&memstats.heap_live) work.pauseNS = 0 work.mode = mode // 記錄開始時間 now := nanotime() work.tSweepTerm = now work.pauseStart = now // 暫停程式 STW systemstack(stopTheWorldWithSema) // 在併發標記前,確保清理結束 systemstack(func() { finishsweep_m() }) // 清理sched.sudogcache 以及 sync.Pools clearpools() // GC 次數 work.cycles++ // 在開始 GC 之前清理控制器的狀態,標記新一輪GC已開始 gcController.startCycle() work.heapGoal = memstats.next_gc // 設定全域性變數中的GC狀態為_GCmark // 然後啟用寫屏障 setGCPhase(_GCmark) // 初始化後臺掃描需要的狀態 gcBgMarkPrepare() // Must happen before assist enable. // 掃描棧上、全域性變數等根物件並將它們加入佇列 gcMarkRootPrepare() // 標記所有tiny alloc等待合併的物件 gcMarkTinyAllocs() // 啟用 mutator assists(協助執行緒) atomic.Store(&gcBlackenEnabled, 1) // 記錄標記開始的時間 gcController.markStartTime = now mp = acquirem() // 啟動程式,後臺任務也會開始標記堆中的物件 systemstack(func() { now = startTheWorldWithSema(trace.enabled) // 記錄停止了多久, 和標記階段開始的時間 work.pauseNS += now - work.pauseStart work.tMark = now }) semrelease(&worldsema) ... } ``` 1. 兩次呼叫 `trigger.test`檢查是否滿足垃圾收集的條件,這個函式我們在上面講過了; 2. 呼叫 `semacquire(&work.startSema) `上鎖,呼叫 `gcBgMarkStartWorkers`啟動後臺標記任務,這個我們後面重點說; 3. 對 work 結構體做初始化工作,設定垃圾收集需要的 Goroutine 數量以及已完成的GC 次數等; 4. 在開始 GC 之前呼叫 `gcController.startCycle` 清理控制器的狀態,標記新一輪GC已開始; 5. 呼叫 setGCPhase 設定全域性變數中的GC狀態為 _GCmark ,然後啟用寫屏障; 6. 呼叫 gcBgMarkPrepare 初始化後臺掃描需要的狀態; 7. 呼叫 gcMarkRootPrepare 將掃描棧上、全域性變數等根物件並將它們加入佇列; 8. 呼叫 gcMarkTinyAllocs 標記所有 tiny alloc 記憶體塊; 9. 設定 gcBlackenEnabled ,啟用 mutator assists(協助執行緒); 10. 記錄完標記開始的時間後,呼叫 startTheWorldWithSema 啟動程式,後臺任務也會開始標記堆中的物件; 下面這張圖顯示了 gcStart 過程中狀態變化,以及 STW 停頓的方法,寫屏障啟用的週期: ![GC Start2](https://img.luozhiyun.com/20210320234339.png) 上面只是粗略的說一下各個函式的作用,下面來分析一些重要的函式。 #### startCycle ```go func (c *gcControllerState) startCycle() { c.scanWork = 0 c.bgScanCredit = 0 c.assistTime = 0 c.dedicatedMarkTime = 0 c.fractionalMarkTime = 0 c.idleMarkTime = 0 // 設定 next_gc 最小值 if memstats.next_gc < memstats.heap_live+1024*1024 { memstats.next_gc = memstats.heap_live + 1024*1024 } // gcBackgroundUtilization 預設是 0.25 // 是GC所佔的P的目標值 totalUtilizationGoal := float64(gomaxprocs) * gcBackgroundUtilization // dedicatedMarkWorkersNeeded 等於P的數量的25% 加上 0.5 去掉小數點 c.dedicatedMarkWorkersNeeded = int64(totalUtilizationGoal + 0.5) utilError := float64(c.dedicatedMarkWorkersNeeded)/totalUtilizationGoal - 1 const maxUtilError = 0.3 if utilError < -maxUtilError || utilError > maxUtilError { if float64(c.dedicatedMarkWorkersNeeded) > totalUtilizationGoal { c.dedicatedMarkWorkersNeeded-- } // 是 gcMarkWorkerFractionalMode 的任務所佔的P的目標值( c.fractionalUtilizationGoal = (totalUtilizationGoal - float64(c.dedicatedMarkWorkersNeeded)) / float64(gomaxprocs) } else { c.fractionalUtilizationGoal = 0 } if debug.gcstoptheworld > 0 { c.dedicatedMarkWorkersNeeded = int64(gomaxprocs) c.fractionalUtilizationGoal = 0 } for _, p := range allp { p.gcAssistTime = 0 p.gcFractionalMarkTime = 0 } // 計算協助GC的引數 c.revise() } ``` 這裡需要注意的是 dedicatedMarkWorkersNeeded 與 fractionalUtilizationGoal 的計算過程,這個會在計算 work 工作模式的用到。 #### 標記 tiny alloc ```go func gcMarkTinyAllocs() { for _, p := range allp { // 標記各個 P 中的 mcache 中的 tiny c := p.mcache if c == nil || c.tiny == 0 { continue } _, span, objIndex := findObject(c.tiny, 0, 0) gcw := &p.gcw // 標記存活物件,並把它加到 gcwork 標記佇列 greyobject(c.tiny, 0, 0, span, gcw, objIndex) } } ``` tiny block 這個資料結構也在記憶體分配那一節講過了,這裡主要是會把所有 P 中的 mcache 中的 tiny 找到並進行標記,然後把它加到 gcwork 標記佇列,至於什麼是 gcwork 標記佇列,我們下面在執行標記的地方會講到。 #### write Barrier 寫屏障 在設定 GC 階段標記的時候會根據當前的設定的值來判斷是否需要開啟 write Barrier : ```go func setGCPhase(x uint32) { atomic.Store(&gcphase, x) writeBarrier.needed = gcphase == _GCmark || gcphase == _GCmarktermination writeBarrier.enabled = writeBarrier.needed || writeBarrier.cgo } ``` 編譯器會在`src\cmd\compile\internal\ssa\writebarrier.go`中呼叫 writebarrier 函式,就如同它的註釋所說: > // writebarrier pass inserts write barriers for store ops (Store, Move, Zero) > // when necessary (the condition above). It rewrites store ops to branches > // and runtime calls, like > // > // if writeBarrier.enabled { > // gcWriteBarrier(ptr, val) // Not a regular Go call > // } else { > // *ptr = val > // } 在執行 Store, Move, Zero 等彙編操作的時候加入寫屏障。 我們可以通過 dlv 斷點找到 gcWriteBarrier 彙編程式碼的位置在 `go/src/runtime/asm_amd64.s:1395`。該彙編函式會呼叫 `runtime.wbBufFlush`將 write barrier 的快取任務新增到 GC 的工作佇列中進行處理。 ```go func wbBufFlush(dst *uintptr, src uintptr) { ... systemstack(func() { ... wbBufFlush1(getg().m.p.ptr()) }) } func wbBufFlush1(_p_ *p) { // 獲取快取的指標 start := uintptr(unsafe.Pointer(&_p_.wbBuf.buf[0])) n := (_p_.wbBuf.next - start) / unsafe.Sizeof(_p_.wbBuf.buf[0]) ptrs := _p_.wbBuf.buf[:n] _p_.wbBuf.next = 0 gcw := &_p_.gcw pos := 0 for _, ptr := range ptrs { // 查詢到物件 obj, span, objIndex := findObject(ptr, 0, 0) if obj == 0 { continue } mbits := span.markBitsForIndex(objIndex) // 判斷是否已被標記 if mbits.isMarked() { continue } // 進行標記 mbits.setMarked() // 標記 span. arena, pageIdx, pageMask := pageIndexOf(span.base()) if arena.pageMarks[pageIdx]&pageMask == 0 { atomic.Or8(&arena.pageMarks[pageIdx], pageMask) } if span.spanclass.noscan() { gcw.bytesMarked += uint64(span.elemsize) continue } ptrs[pos] = obj pos++ } // 將物件加入到 gcWork佇列中 gcw.putBatch(ptrs[:pos]) // 重置 write barrier 快取 _p_.wbBuf.reset() } ``` 寫屏障這裡其實也是和併發標記是一樣的套路,可以看完併發標記再過來看。wbBufFlush1 會遍歷write barrier 快取,然後呼叫 findObject 查詢到物件之後使用標誌位進行標記,最後將物件加入到 gcWork佇列中進行掃描,並 重置 write barrier 快取。 #### stopTheWorldWithSema 與 startTheWorldWithSema stopTheWorldWithSema 與 startTheWorldWithSema 是一對用於暫停和恢復程式的核心函式。 ```go func stopTheWorldWithSema() { _g_ := getg() lock(&sched.lock) sched.stopwait = gomaxprocs // 標記 gcwaiting,排程時看見此標記會進入等待 atomic.Store(&sched.gcwaiting, 1) // 傳送搶佔訊號 preemptall() // 暫停當前 P _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic. sched.stopwait-- // 遍歷所有的 P ,修改 P 的狀態為 _Pgcstop 停止執行 for _, p := range allp { s := p.status if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) { if trace.enabled { traceGoSysBlock(p) traceProcStop(p) } p.syscalltick++ sched.stopwait-- } } // 停止空閒的 P 列表 for { p := pidleget() if p == nil { break } p.status = _Pgcstop sched.stopwait-- } wait := sched.stopwait > 0 unlock(&sched.lock) if wait { for { // 等待 100 us if notetsleep(&sched.stopnote, 100*1000) { noteclear(&sched.stopnote) break } // 再次進行傳送搶佔訊號 preemptall() } } // 安全檢測 bad := "" if sched.stopwait != 0 { bad = "stopTheWorld: not stopped (stopwait != 0)" } else { for _, p := range allp { if p.status != _Pgcstop { bad = "stopTheWorld: not stopped (status != _Pgcstop)" } } } if atomic.Load(&freezing) != 0 { lock(&deadlock) lock(&deadlock) } if bad != "" { throw(bad) } } ``` 這個方法會通過` sched.stopwait`來檢測是否所有的 P 都已暫停。首先會通過呼叫 preemptall 傳送搶佔訊號進行搶佔所有執行中的 G,然後遍歷 P 將所有狀態為 _Psyscall、空閒的 P 都暫停,如果仍有需要停止的P, 則等待它們停止。 ```go func startTheWorldWithSema(emitTraceEvent bool) int64 { mp := acquirem() // disable preemption because it can be holding p in a local var // 判斷收到的 netpoll 事件並新增對應的G到待執行佇列 if netpollinited() { list := netpoll(0) // non-blocking injectglist(&list) } lock(&sched.lock) procs := gomaxprocs if newprocs != 0 { procs = newprocs newprocs = 0 } // 擴容或者縮容全域性的處理器 p1 := procresize(procs) // 取消GC等待標記 sched.gcwaiting = 0 // 如果 sysmon (後臺監控執行緒) 在等待則喚醒它 if sched.sysmonwait != 0 { sched.sysmonwait = 0 notewakeup(&sched.sysmonnote) } unlock(&sched.lock) // 喚醒有可執行任務的P for p1 != nil { p := p1 p1 = p1.link.ptr() if p.m != 0 { mp := p.m.ptr() p.m = 0 if mp.nextp != 0 { throw("startTheWorld: inconsistent mp->nextp") } mp.nextp.set(p) notewakeup(&mp.park) } else { // Start M to run P newm(nil, p, -1) } } startTime := nanotime() if emitTraceEvent { traceGCSTWDone() } // 如果有空閒的P,並且沒有自旋中的M則喚醒或者建立一個M wakep() releasem(mp) return startTime } ``` startTheWorldWithSema 就顯得簡單的多,首先從 netpoller 中獲取待處理的任務並加入全域性佇列;然後遍歷 P 連結串列,喚醒有可執行任務的P。 ### 建立後臺標記 Worker ```go func gcBgMarkStartWorkers() { // 遍歷所有 P for _, p := range allp { // 如果已啟動則不重複啟動 if p.gcBgMarkWorker == 0 { // 為全域性每個處理器建立用於執行後臺標記任務的 Goroutine go gcBgMarkWorker(p) // 啟動後等待該任務通知訊號量 bgMarkReady 再繼續 notetsleepg(&work.bgMarkReady, -1) noteclear(&work.bgMarkReady) } } } ``` gcBgMarkStartWorkers 會為全域性每個 P 建立用於執行後臺標記任務的 Goroutine,每一個 Goroutine 都會執行 gcBgMarkWorker,notetsleepg 會等待 gcBgMarkWorker 通知訊號量 bgMarkReady 再繼續。 這裡雖然為每個 P 啟動了一個後臺標記任務, 但是可以同時工作的只有 25%,排程器在排程迴圈 `runtime.schedule`中通過呼叫 `gcController.findRunnableGCWorker`方法進行控制。 在看這個方法之前,先來了解一個概念, Mark Worker Mode 標記工作模式,目前來說有三種,這三種是為了保證後臺的標記執行緒的利用率。 ```go type gcMarkWorkerMode int const ( // gcMarkWorkerDedicatedMode indicates that the P of a mark // worker is dedicated to running that mark worker. The mark // worker should run without preemption. gcMarkWorkerDedicatedMode gcMarkWorkerMode = iota // gcMarkWorkerFractionalMode indicates that a P is currently // running the "fractional" mark worker. The fractional worker // is necessary when GOMAXPROCS*gcBackgroundUtilization is not // an integer. The fractional worker should run until it is // preempted and will be scheduled to pick up the fractional // part of GOMAXPROCS*gcBackgroundUtilization. gcMarkWorkerFractionalMode // gcMarkWorkerIdleMode indicates that a P is running the mark // worker because it has nothing else to do. The idle worker // should run until it is preempted and account its time // against gcController.idleMarkTime. gcMarkWorkerIdleMode ) ``` 通過程式碼註釋可以知道: * gcMarkWorkerDedicatedMode :P 專門負責標記物件,不會被排程器搶佔; * gcMarkWorkerFractionalMode:主要是由於現在預設標記執行緒的佔用率要為 25%,所以如果 CPU 核數不是4的倍數,就無法除得整數,啟動該型別的工作模式幫助垃圾收集達到利用率的目標; * gcMarkWorkerIdleMode:表示 P 當前只有標記執行緒在跑,沒有其他可以執行的 G ,它會執行垃圾收集的標記任務直到被搶佔; ```go func (c *gcControllerState) findRunnableGCWorker(_p_ *p) *g { ... // 原子減少對應的值, 如果減少後大於等於0則返回true, 否則返回false decIfPositive := func(ptr *int64) bool { if *ptr > 0 { if atomic.Xaddint64(ptr, -1) >= 0 { return true } // We lost a race atomic.Xaddint64(ptr, +1) } return false } // 減少dedicatedMarkWorkersNeeded, 成功時後臺標記任務的模式是Dedicated if decIfPositive(&c.dedicatedMarkWorkersNeeded) { _p_.gcMarkWorkerMode = gcMarkWorkerDedicatedMode } else if c.fractionalUtilizationGoal == 0 { // No need for fractional workers. return nil } else { // 執行標記任務的時間 delta := nanotime() - gcController.markStartTime if delta > 0 && float64(_p_.gcFractionalMarkTime)/float64(delta) > c.fractionalUtilizationGoal { // Nope. No need to run a fractional worker. return nil } _p_.gcMarkWorkerMode = gcMarkWorkerFractionalMode } gp := _p_.gcBgMarkWorker.ptr() casgstatus(gp, _Gwaiting, _Grunnable) return gp } ``` 在 findRunnableGCWorker 會通過 dedicatedMarkWorkersNeeded 來決定是否採用 gcMarkWorkerDedicatedMode 的 Mark Worker Mode 標記工作模式。dedicatedMarkWorkersNeeded 是在 `gcControllerState.startCycle`中進行初始化。 公式我就不貼了,在 `gcControllerState.startCycle`已經講過了,通俗來說如果當前是 8 核 CPU,那麼 dedicatedMarkWorkersNeeded 為 2 ,如果是 6 核 CPU,因為無法被 4 整除,計算得 dedicatedMarkWorkersNeeded 為 1,所以需要上面得 gcMarkWorkerFractionalMode 模式來保證 CPU 的利用率。 gcMarkWorkerIdleMode 會在排程器執行 findrunnable 搶佔的時候呼叫: ```go func findrunnable() (gp *g, inheritTime bool) { ... stop: // 處於 GC 階段的話,獲取執行GC標記任務的G if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) { _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := _p_.gcBgMarkWorker.ptr() //將本地 P 的 GC 標記專用 G 職位 Grunnable casgstatus(gp, _Gwaiting, _Grunnable) return gp, false } ... } ``` 看過我的《詳解Go語言排程迴圈原始碼實現》的同學應該都知道,搶佔排程執行到這裡的時候,通常是 P 搶佔不到 G 了,打算進行休眠了,因此在休眠之前可以安全的進行標記任務的執行。 沒看過排程迴圈的同學可以看這裡:詳解Go語言排程迴圈原始碼實現 https://www.luozhiyun.com/archives/448 。 ### 併發掃描標記 併發掃描標記可以大概概括為以下幾個部分: 1. 將當前傳入的 P 打包成 parkInfo ,然後呼叫 gopark 讓當前 G 進入休眠,在休眠前會將 P 的 gcBgMarkWorker 與 G 進行繫結,等待喚醒; 2. 根據 Mark Worker Mode 呼叫不同的策略呼叫 gcDrain 執行標記; 3. 判斷是否所有後臺標記任務都完成, 並且沒有更多的任務,呼叫 gcMarkDone 準備進入完成標記階段; #### 後臺標記休眠等待 ```go func gcBgMarkWorker(_p_ *p) { gp := getg() type parkInfo struct { m muintptr attach puintptr } gp.m.preemptoff = "GC worker init" // 初始化 park park := new(parkInfo) gp.m.preemptoff = "" // 設定當前的M並禁止搶佔 park.m.set(acquirem()) // 設定當前的P park.attach.set(_p_) // 通知gcBgMarkStartWorkers可以繼續處理 notewakeup(&work.bgMarkReady) for { // 讓當前 G 進入休眠 gopark(func(g *g, parkp unsafe.Pointer) bool { park := (*parkInfo)(parkp) releasem(park.m.ptr()) // 設定關聯的 P if park.attach != 0 { p := park.attach.ptr() park.attach.set(nil) // 把當前的G設到P的gcBgMarkWorker成員 if !p.gcBgMarkWorker.cas(0, guintptr(unsafe.Pointer(g))) { return false } } return true }, unsafe.Pointer(park), waitReasonGCWorkerIdle, traceEvGoBlock, 0) ... } } ``` 在 gcBgMarkStartWorkers 中我們看到,它會遍歷所有的 P ,然後為每個 P 建立一個負責 Mark Work 的 G,這裡雖然為每個 P 啟動了一個後臺標記任務, 但是不可能每個 P 都會去執行標記任務,後臺標記任務預設資源佔用率是 25%,所以 gcBgMarkWorker 中會初始化 park 並將 G 和 P 的 gcBgMarkWorker 進行繫結後進行休眠。 排程器在排程迴圈 `runtime.schedule`中通過呼叫 `gcController.findRunnableGCWorker`方法進行控制,讓哪些 Mark Work 可以執行,上面程式碼已經貼過了,這裡就不重複了。 ![WorkMark](https://img.luozhiyun.com/20210320234350.png) #### 後臺標記 在喚醒後,我們會根據 gcMarkWorkerMode 選擇不同的標記執行策略,不同的執行策略都會呼叫 `runtime.gcDrain` : ```go func gcBgMarkWorker(_p_ *p) { gp := getg() ... for { ... // 檢查P的gcBgMarkWorker是否和當前的G一致, 不一致時結束當前的任務 if _p_.gcBgMarkWorker.ptr() != gp { break } // 禁止G被搶佔 park.m.set(acquirem()) // 記錄開始時間 startTime := nanotime() _p_.gcMarkWorkerStartTime = startTime decnwait := atomic.Xadd(&work.nwait, -1) systemstack(func() { // 設定G的狀態為等待中這樣它的棧可以被掃描 casgstatus(gp, _Grunning, _Gwaiting) // 判斷後臺標記任務的模式 switch _p_.gcMarkWorkerMode { default: throw("gcBgMarkWorker: unexpected gcMarkWorkerMode") case gcMarkWorkerDedicatedMode: // 這個模式下P應該專心執行標記 gcDrain(&_p_.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit) if gp.preempt { // 被搶佔時把本地執行佇列中的所有G都踢到全域性執行佇列 lock(&sched.lock) for { gp, _ := runqget(_p_) if gp == nil { break } globrunqput(gp) } unlock(&sched.lock) } // 繼續執行標記 gcDrain(&_p_.gcw, gcDrainFlushBgCredit) case gcMarkWorkerFractionalMode: // 執行標記 gcDrain(&_p_.gcw, gcDrainFractional|gcDrainUntilPreempt|gcDrainFlushBgCredit) case gcMarkWorkerIdleMode: // 執行標記, 直到被搶佔或者達到一定的量 gcDrain(&_p_.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit) } // 恢復G的狀態到執行中 casgstatus(gp, _Gwaiting, _Grunning) }) ... } } ``` 在上面已經講了不同的 Mark Worker Mode 的區別,不記得的同學可以往上翻一下。執行標記這部分主要在 switch 判斷中,根據不同的模式傳入不同的引數到 gcDrain 函式中執行。 需要注意的是,傳入到 gcDrain 中的是一個 gcWork 的結構體,它相當於每個 P 的私有快取空間,存放需要被掃描的物件,為垃圾收集器提供了生產和消費任務的抽象,,該結構體持有了兩個重要的工作緩衝區 `wbuf1` 和 `wbuf2`: ![gcWork2](https://img.luozhiyun.com/20210320234354.png) 當我們向該結構體中增加或者刪除物件時,它總會先操作 `wbuf1` 緩衝區,一旦 `wbuf1` 緩衝區空間不足或者沒有物件,會觸發緩衝區的切換,而當兩個緩衝區空間都不足或者都為空時,會從全域性的工作緩衝區中插入或者獲取物件: ```go func (w *gcWork) tryGet() uintptr { wbuf := w.wbuf1 ... // wbuf1緩衝區無資料時 if wbuf.nobj == 0 { // wbuf1 與 wbuf2 進行物件互換 w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1 wbuf = w.wbuf1 if wbuf.nobj == 0 { owbuf := wbuf // 從 work 的 full 佇列中獲取 wbuf = trygetfull() ... } } wbuf.nobj-- return wbuf.obj[wbuf.nobj] } ``` 繼續上面的 gcBgMarkWorker 方法,在標記完之後就要進行標記完成: ```go func gcBgMarkWorker(_p_ *p) { gp := getg() ... for { ... // 累加所用時間 duration := nanotime() - startTime switch _p_.gcMarkWorkerMode { case gcMarkWorkerDedicatedMode: atomic.Xaddint64(&gcController.dedicatedMarkTime, duration) atomic.Xaddint64(&gcController.dedicatedMarkWorkersNeeded, 1) case gcMarkWorkerFractionalMode: atomic.Xaddint64(&gcController.fractionalMarkTime, duration) atomic.Xaddint64(&_p_.gcFractionalMarkTime, duration) case gcMarkWorkerIdleMode: atomic.Xaddint64(&gcController.idleMarkTime, duration) } incnwait := atomic.Xadd(&work.nwait, +1) // 判斷是否所有後臺標記任務都完成, 並且沒有更多的任務 if incnwait == work.nproc && !gcMarkWorkAvailable(nil) { // 取消和P的關聯 _p_.gcBgMarkWorker.set(nil) // 允許G被搶佔 releasem(park.m.ptr()) // 準備進入完成標記階段 gcMarkDone() // 休眠之前會重新關聯P // 因為上面允許被搶佔, 到這裡的時候可能就會變成其他P // 如果重新關聯P失敗則這個任務會結束 park.m.set(acquirem()) park.attach.set(_p_) } } } ``` gcBgMarkWorker 會根據 incnwait 來檢查是否是最後一個 worker,然後呼叫 gcMarkWorkAvailable 函式來校驗 gcwork的任務和全域性任務是否已經全部都處理完了,如果都確認沒問題,那麼呼叫 gcMarkDone 進入完成標記階段。 #### 標記掃描 下面我們來看看 gcDrain: ```go func gcDrain(gcw *gcWork, flags gcDrainFlags) { gp := getg().m.curg // 看到搶佔標誌時是否要返回 preemptible := flags&gcDrainUntilPreempt != 0 // 是否計算後臺的掃描量來減少協助執行緒和喚醒等待中的G flushBgCredit := flags&gcDrainFlushBgCredit != 0 // 是否只執行一定量的工作 idle := flags&gcDrainIdle != 0 // 記錄初始的已掃描數量 initScanWork := gcw.scanWork checkWork := int64(1<<63 - 1) var check func() bool if flags&(gcDrainIdle|gcDrainFractional) != 0 { // drainCheckThreshold 預設 100000 checkWork = initScanWork + drainCheckThreshold if idle { check = pollWork } else if flags&gcDrainFractional != 0 { check = pollFractionalWorkerExit } } // 如果根物件未掃描完, 則先掃描根物件 if work.markrootNext < work.markrootJobs { // 一直迴圈直到被搶佔或 STW for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) { // 從根物件掃描佇列取出一個值 job := atomic.Xadd(&work.markrootNext, +1) - 1 if job >= work.markrootJobs { break } // 執行根物件掃描工作 markroot(gcw, job) if check != nil && check() { goto done } } } ... } ``` gcDrain 函式在開始的時候,會根據 flags 不同而選擇不同的策略。 * gcDrainUntilPreempt:當 G 被搶佔時返回; * gcDrainIdle:呼叫 `runtime.pollWork`,當 P 上包含其他待執行 G 時返回; * gcDrainFractional:呼叫 `runtime.pollFractionalWorkerExit`,當 CPU 的佔用率超過 `fractionalUtilizationGoal` 的 20% 時返回; 設定完 check 變數後就可以執行 `runtime.markroot`進行根物件掃描,每次掃描完畢都會呼叫 check 函式校驗是否應該退出標記任務,如果是那麼就跳到 done 程式碼塊中退出標記。 完成標記後會獲取待執行的任務: ```go func gcDrain(gcw *gcWork, flags gcDrainFlags) { ... // 根物件已經在標記佇列中, 消費標記佇列 // 一直迴圈直到被搶佔或 STW for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) { // 將本地一部分工作放回全域性佇列中 if work.full == 0 { gcw.balance() } // 獲取任務 b := gcw.tryGetFast() if b == 0 { b = gcw.tryGet() if b == 0 { wbBufFlush(nil, 0) b = gcw.tryGet() } } // 獲取不到物件, 標記佇列已為空, 跳出迴圈 if b == 0 { break } // 掃描獲取到的物件 scanobject(b, gcw) // 如果已經掃描了一定數量的物件,gcCreditSlack值是2000 if gcw.scanWork >= gcCreditSlack { // 把掃描的物件數量新增到全域性 atomic.Xaddint64(&gcController.scanWork, gcw.scanWork) if flushBgCredit { // 記錄這次掃描的記憶體位元組數用於減少輔助標記的工作量 gcFlushBgCredit(gcw.scanWork - initScanWork) initScanWork = 0 } checkWork -= gcw.scanWork gcw.scanWork = 0 if checkWork <= 0 { checkWork += drainCheckThreshold if check != nil && check() { break } } } } done: // 把掃描的物件數量新增到全域性 if gcw.scanWork > 0 { atomic.Xaddint64(&gcController.scanWork, gcw.scanWork) if flushBgCredit { // 記錄這次掃描的記憶體位元組數用於減少輔助標記的工作量 gcFlushBgCredit(gcw.scanWork - initScanWork) } gcw.scanWork = 0 } } ``` 這裡在獲取快取佇列之前會呼叫 `runtime.gcWork.balance`,會將 gcWork 快取一部分工作放回全域性佇列中,這個方法主要是用來平衡一下不同 P 的負載情況。 然後獲取 gcWork 的快取任務,並將獲取到的任務交給 scanobject 執行,該函式會從傳入的位置開始掃描,並會給找到的活躍物件上色。`runtime.gcFlushBgCredit` 會記錄這次掃描的記憶體位元組數用於減少輔助標記的工作量。 這裡我來總結一下 gcWork 出入隊情況。gcWork 的出隊就是我們上面的 scanobject 方法,會獲取到 gcWork 快取物件並執行,但是同時如果找到活躍物件也會再次的入隊到 gcWork 中。 除了 scanobject 以外,寫屏障、根物件掃描和棧掃描都會向 gcWork 中增加額外的灰色物件等待處理。 ![GcWork](https://img.luozhiyun.com/20210320234359.png) **根標記** ```go func markroot(gcw *gcWork, i uint32) { baseFlushCache := uint32(fixedRootCount) baseData := baseFlushCache + uint32(work.nFlushCacheRoots) baseBSS := baseData + uint32(work.nDataRoots) baseSpans := baseBSS + uint32(work.nBSSRoots) baseStacks := baseSpans + uint32(work.nSpanRoots) end := baseStacks + uint32(work.nStackRoots) switch { // 釋放mcache中的所有span, 要求STW case baseFlushCache <= i && i < baseData: flushmcache(int(i - baseFlushCache)) // 掃描可讀寫的全域性變數 case baseData <= i && i < baseBSS: for _, datap := range activeModules() { markrootBlock(datap.data, datap.edata-datap.data, datap.gcdatamask.bytedata, gcw, int(i-baseData)) } // 掃描未初始化的全域性變數 case baseBSS <= i && i < baseSpans: for _, datap := range activeModules() { markrootBlock(datap.bss, datap.ebss-datap.bss, datap.gcbssmask.bytedata, gcw, int(i-baseBSS)) } // 掃描 finalizers 佇列 case i == fixedRootFinalizers: for fb := allfin; fb != nil; fb = fb.alllink { cnt := uintptr(atomic.Load(&fb.cnt)) scanblock(uintptr(unsafe.Pointer(&fb.fin[0])), cnt*unsafe.Sizeof(fb.fin[0]), &finptrmask[0], gcw, nil) } // 釋放已中止的 G 的棧 case i == fixedRootFreeGStacks: systemstack(markrootFreeGStacks) // 掃描 MSpan.specials case baseSpans <= i && i < baseStacks: markrootSpans(gcw, int(i-baseSpans)) // 掃描各個 G 的棧 default: // 獲取需要掃描的 G var gp *g if baseStacks <= i && i < end { gp = allgs[i-baseStacks] } else { throw("markroot: bad index") } // 記錄等待開始的時間 status := readgstatus(gp) // We are not in a scan state if (status == _Gwaiting || status == _Gsyscall) && gp.waitsince == 0 { gp.waitsince = work.tstart } // 轉交給g0進行掃描 systemstack(func() { userG := getg().m.curg selfScan := gp == userG && readgstatus(userG) == _Grunning // 如果是掃描自己的,則轉換自己的g的狀態 if selfScan { casgstatus(userG, _Grunning, _Gwaiting) userG.waitreason = waitReasonGarbageCollectionScan } // 掛起 G,讓對應的 G 停止執行 stopped := suspendG(gp) if stopped.dead { gp.gcscandone = true return } if gp.gcscandone { throw("g already scanned") } // 掃描g的棧 scanstack(gp, gcw) gp.gcscandone = true resumeG(stopped) if selfScan { casgstatus(userG, _Gwaiting, _Grunning) } }) } } ``` 看到上面掃描的BSS和Date相關的記憶體塊的時候我也是感到非常的疑惑,我們結合維基百科 Data segment https://en.wikipedia.org/wiki/Data_segment 的解釋可以看到: > The *.data* segment contains any global or static variables which have a pre-defined value and can be modified. > > The BSS segment, also known as *uninitialized data*, is usually adjacent to the data segment. Data 段通常是提前被初始化的全域性變數,BSS 段通常是沒有被初始化的資料。 因為涉及到太多快取、資料段、棧記憶體的掃,很多位操作和指標操作,相關程式碼實現比較複雜。下面簡單看看 scanblock,scanstack。 **scanblock** ```go func scanblock(b0, n0 uintptr, ptrmask *uint8, gcw *gcWork, stk *stackScanState) { b := b0 n := n0 // 遍歷掃描的地址 for i := uintptr(0); i < n; { // 找到bitmap中對應的byte bits := uint32(*addb(ptrmask, i/(sys.PtrSize*8))) if bits == 0 { i += sys.PtrSize * 8 continue } // 遍歷 byte for j := 0; j < 8 && i < n; j++ { // 如果該地址包含指標 if bits&1 != 0 { p := *(*uintptr)(unsafe.Pointer(b + i)) if p != 0 { // 標記在該地址的物件存活, 並把它加到標記佇列 if obj, span, objIndex := findObject(p, b, i); obj != 0 { greyobject(obj, b, i, span, gcw, objIndex) } else if stk != nil && p >= stk.stack.lo && p < stk.stack.hi { stk.putPtr(p, false) } } } bits >>= 1 i += sys.PtrSize } } } ``` **scanstack** ```go func scanstack(gp *g, gcw *gcWork) { ... // 判斷是否可以安全的進行 收縮棧 if isShrinkStackSafe(gp) { // Shrink the stack if not much of it is being used. // 收縮棧 shrinkstack(gp) } else { // Otherwise, shrink the stack at the next sync safe point. // 否則下次安全點再進行收縮棧 gp.preemptShrink = true } var state stackScanState state.stack = gp.stack if gp.sched.ctxt != nil { scanblock(uintptr(unsafe.Pointer(&gp.sched.ctxt)), sys.PtrSize, &oneptrmask[0], gcw, &state) } scanframe := func(frame *stkframe, unused unsafe.Pointer) bool { scanframeworker(frame, &state, gcw) return true } // 列舉所有呼叫幀 gentraceback(^uintptr(0), ^uintptr(0), 0, gp, 0, nil, 0x7fffffff, scanframe, nil, 0) // 列舉所有defer的呼叫幀 tracebackdefers(gp, scanframe, nil) // Find and trace other pointers in defer records. // 掃描defer中的程式碼塊 for d := gp._defer; d != nil; d = d.link { ... } if gp._panic != nil { state.putPtr(uintptr(unsafe.Pointer(gp._panic)), false) } // 掃描並找到所有可達的棧物件 state.buildIndex() for { p, conservative := state.getPtr() if p == 0 { break } obj := state.findObject(p) if obj == nil { continue } t := obj.typ // 已被掃描過 if t == nil { continue } // 標記掃描 obj.setType(nil) gcdata := t.gcdata var s *mspan if t.kind&kindGCProg != 0 { s = materializeGCProg(t.ptrdata, gcdata) gcdata = (*byte)(unsafe.Pointer(s.startAddr)) } b := state.stack.lo + uintptr(obj.off) if conservative { scanConservative(b, t.ptrdata, gcdata, gcw, &state) } else { scanblock(b, t.ptrdata, gcdata, gcw, &state) } if s != nil { dematerializeGCProg(s) } } for state.head != nil { x := state.head state.head = x.next x.nobj = 0 putempty((*workbuf)(unsafe.Pointer(x))) } if state.buf != nil || state.cbuf != nil || state.freeBuf != nil { throw("remaining pointer buffers") } } ``` **greyobject** ```go func greyobject(obj, base, off uintptr, span *mspan, gcw *gcWork, objIndex uintptr) { // obj should be start of allocation, and so must be at least pointer-aligned. if obj&(sys.PtrSize-1) != 0 { throw("greyobject: obj not pointer-aligned") } mbits := span.markBitsForIndex(objIndex) // 檢查是否所有可到達的物件都被正確標記的機制, 僅除錯使用 if useCheckmark { ... } else { ... // 被標記過了直接返回 if mbits.isMarked() { return } // 設定標記 mbits.setMarked() // 標記 span arena, pageIdx, pageMask := pageIndexOf(span.base()) if arena.pageMarks[pageIdx]&pageMask == 0 { atomic.Or8(&arena.pageMarks[pageIdx], pageMask) } // span的型別是noscan, 則不需要把物件放入標記佇列 if span.spanclass.noscan() { gcw.bytesMarked += uint64(span.elemsize) return } } // 嘗試存入gcwork的快取中,或全域性佇列中 if !gcw.putFast(obj) { gcw.put(obj) } } ``` **物件掃描** ```go func scanobject(b uintptr, gcw *gcWork) { // 獲取 b 的 heapBits 物件 hbits := heapBitsForAddr(b) // 獲取 span s := spanOfUnchecked(b) // span 對應的物件大小 n := s.elemsize if n == 0 { throw("scanobject n == 0") } // 每次最大隻掃描128KB if n > maxObletBytes { // Large object. Break into oblets for better // parallelism and lower latency. if b == s.base() { if s.spanclass.noscan() { // Bypass the whole scan. gcw.bytesMarked += uint64(n) return } // 把多於128KB的物件重新放回gcworker中,下次再掃描 for oblet := b + maxObletBytes; oblet < s.base()+s.elemsize; oblet += maxObletBytes { if !gcw.putFast(oblet) { gcw.put(oblet) } } } n = s.base() + s.elemsize - b if n > maxObletBytes { n = maxObletBytes } } var i uintptr for i = 0; i < n; i += sys.PtrSize { // 獲取對應的bit // Find bits for this word. if i != 0 { hbits = hbits.next() } bits := hbits.bits() // 檢查scan bit判斷是否繼續掃描 if i != 1*sys.PtrSize && bits&bitScan == 0 { break // no more pointers in this object } // 如果不是指標則繼續 if bits&bitPointer == 0 { continue // not a pointer } // 取出指標的值 obj := *(*uintptr)(unsafe.Pointer(b + i)) if obj != 0 && obj-b >= n { // 根據地址值去堆中查詢物件 if obj, span, objIndex := findObject(obj, b, i); obj != 0 { // 呼叫 greyobject 標記物件並把物件放到標記佇列中 greyobject(obj, b, i, span, gcw, objIndex) } } } gcw.bytesMarked += uint64(n) gcw.scanWork += int64(i) } ``` #### 輔助標記 mutator assists 在分析的一開始也提到了一些關於 mutator assists 的作用,主要是為了防止 heap 增速太快, 在GC 執行的過程中如果同時執行的 G 分配了記憶體, 那麼這個 G 會被要求輔助 GC 做一部分的工作,它遵循一條非常簡單並且樸實的原則,**分配多少記憶體就需要完成多少標記任務**。 mutator assists 的入口是在 `go\src\runtime\malloc.go` 的mallocgc 函式中: ```go func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer { ... var assistG *g if gcBlackenEnabled != 0 { assistG = getg() if assistG.m.curg != nil { assistG = assistG.m.curg } // 減去記憶體值 assistG.gcAssistBytes -= int64(size) if assistG.gcAssistBytes < 0 { // This G is in debt. gcAssistAlloc(assistG) } } ... return x } ``` mallocgc 在分配記憶體的時候每次都會檢查 gcAssistBytes 欄位是否為負值,這個欄位儲存了當前 Goroutine 輔助標記的物件位元組數。如果為負數,那麼會呼叫 gcAssistAlloc 從全域性信用 bgScanCredit 中獲取: ```go func gcAssistAlloc(gp *g) { ... retry: // 計算需要完成的標記任務數量 debtBytes := -gp.gcAssistBytes scanWork := int64(gcController.assistWorkPerByte * float64(debtBytes)) if scanWork < gcOverAssistWork { scanWork = gcOverAssistWork debtBytes = int64(gcController.assistBytesPerWork * float64(scanWork)) } // 獲取全域性輔助標記的位元組數 bgScanCredit := atomic.Loadint64(&gcController.bgScanCredit) stolen := int64(0) if bgScanCredit > 0 { if bgScanCredit < scanWork { stolen = bgScanCredit gp.gcAssistBytes += 1 + int64(gcController.assistBytesPerWork*float64(stolen)) } else { stolen = scanWork gp.gcAssistBytes += debtBytes } // 全域性信用扣除stolen點數 atomic.Xaddint64(&gcController.bgScanCredit, -stolen) scanWork -= stolen // 減到 0 說明 bgScanCredit 是由足夠的信用可以處理 scanWork if scanWork == 0 { return } } // 到這裡說明 bgScanCredit 小於 scanWork // 需要呼叫 gcDrainN 完成指定數量的標記任務並返回 systemstack(func() { // 執行標記任務 gcAssistAlloc1(gp, scanWork) }) completed := gp.param != nil gp.param = nil if completed { gcMarkDone() } if gp.gcAssistBytes < 0 { if gp.preempt { Gosched() goto retry } // 如果全域性信用仍然不足將當前 Goroutine 陷入休眠 // 加入全域性的輔助標記佇列並等待後臺標記任務的喚醒 if !gcParkAssist() { goto retry } } } ``` 如果全域性信用仍然不足將當前 Goroutine 陷入休眠 ,加入全域性的輔助標記佇列並等待後臺標記任務的喚醒。 掃描記憶體時呼叫 gcFlushBgCredit 會負責喚醒輔助標記 Goroutine : ```go func gcFlushBgCredit(scanWork int64) { // 輔助佇列中不存在等待的 Goroutine if work.assistQueue.q.empty() { // 當前的信用會直接加到全域性信用 bgScanCredit atomic.Xaddint64(&gcController.bgScanCredit, scanWork) return } scanBytes := int64(float64(scanWork) * gcController.assistBytesPerWork) lock(&work.assistQueue.lock) // 如果輔助佇列不為空 for !work.assistQueue.q.empty() && scanBytes > 0 { gp := work.assistQueue.q.pop() // 喚醒 Goroutine if scanBytes+gp.gcAssistBytes >= 0 { scanBytes += gp.gcAssistBytes gp.gcAssistBytes = 0 ready(gp, 0, false) } else { gp.gcAssistBytes += scanBytes scanBytes = 0 work.assistQueue.q.pushBack(gp) break } } // 標記任務量仍然有剩餘,這些標記任務都會加入全域性信用 if scanBytes > 0 { scanWork = int64(float64(scanBytes) * gcController.assistWorkPerByte) atomic.Xaddint64(&gcController.bgScanCredit, scanWork) } unlock(&work.assistQueue.lock) } ``` gcFlushBgCredit 會獲取睡眠的輔助佇列 Goroutine ,如果當前信用足夠,那麼就會將輔助 Goroutine 喚醒,如果還有剩餘的,那麼就會將這些標記任務都會加入全域性信用。 總體來說是如下的一套機制: ![mutator](https://img.luozhiyun.com/20210320234410.png) ### 完成標記 上面我們在 gcBgMarkWorker 中分析了,在標記完成後會呼叫 gcMarkDone 執行標記完成操作。 ```go func gcMarkDone() { semacquire(&work.markDoneSema) top: // 再次檢查任務是否已執行完畢 if !(gcphase == _GCmark && work.nwait == work.nproc && !gcMarkWorkAvailable(nil)) { semrelease(&work.markDoneSema) return } semacquire(&worldsema) gcMarkDoneFlushed = 0 systemstack(func() { gp := getg().m.curg casgstatus(gp, _Grunning, _Gwaiting) // 遍歷所有的 P forEachP(func(_p_ *p) { // 將 P 對應的write barrier buffer 中的物件加入到 gcWork 中 wbBufFlush1(_p_) // 將 gcWork 中的快取物件加入到全域性佇列中 _p_.gcw.dispose() // 表示 gcWork 的資料都已遷移到 全域性佇列中 if _p_.gcw.flushedWork { atomic.Xadd(&gcMarkDoneFlushed, 1) _p_.gcw.flushedWork = false } else if debugCachedWork { ... } ... }) casgstatus(gp, _Gwaiting, _Grunning) }) if gcMarkDoneFlushed != 0 { if debugCachedWork { // Release paused gcWorks. atomic.Xadd(&gcWorkPauseGen, 1) } semrelease(&worldsema) goto top } // 記錄完成標記階段開始的時間和STW開始的時間 now := nanotime() work.tMarkTerm = now work.pauseStart = now // 禁止G被搶佔 getg().m.preemptoff = "gcing" // STW systemstack(stopTheWorldWithSema) ... // 禁止輔助GC和後臺標記任務的執行 atomic.Store(&gcBlackenEnabled, 0) // 喚醒所有因為輔助GC而休眠的G gcWakeAllAssists() semrelease(&work.markDoneSema) schedEnableUser(true) // 計算下一次觸發gc需要的heap大小 nextTriggerRatio := gcController.endCycle() // 執行標記終止 gcMarkTermination(nextTriggerRatio) } ``` gcMarkDone 會呼叫 forEachP 函式遍歷所有的 P ,並將對應 P 中的 gcWork 中的任務移動到全域性佇列中,如果 gcWork 中有任務那麼會將 gcMarkDoneFlushed 加1,遍歷完所有的 P 之後會判斷如果 gcMarkDoneFlushed 不為0,那麼跳轉到 top 標記位繼續迴圈執行,直到本地佇列中沒有任務為止。 接下來會關將 gcBlackenEnabled 設定為0,表示關閉輔助標記協程以及後臺標記;喚醒被阻塞的輔助標記協程;呼叫 schedEnableUser 恢復使用者 Goroutine 的排程;需要注意的是,目前處在 STW 階段,所以被喚醒的 Goroutine 不會立馬執行,會等到 STW 結束後才執行。 最後呼叫 gcMarkTermination 執行標記終止。 ### 標記終止 ```go func gcMarkTermination(nextTriggerRatio float64) { // 禁止輔助GC和後臺標記任務的執行 atomic.Store(&gcBlackenEnabled, 0) // 將 GC 階段切換到 _GCmarktermination setGCPhase(_GCmarktermination) work.heap1 = memstats.heap_live // 記錄開始時間 startTime := nanotime() mp := acquirem() mp.preemptoff = "gcing" _g_ := getg() _g_.m.traceback = 2 gp := _g_.m.curg // 設定 G 的狀態為等待中 casgstatus(gp, _Grunning, _Gwaiting) gp.waitreason = waitReasonGarbageCollection // 切換到 g0 執行 systemstack(func() { // 開始 STW 中的標記 gcMark(startTime) }) systemstack(func() { work.heap2 = work.bytesMarked ... // 設定當前GC階段到關閉, 並禁用寫屏障 setGCPhase(_GCoff) // 喚醒後臺清掃任務 gcSweep(work.mode) }) _g_.m.traceback = 0 casgstatus(gp, _Gwaiting, _Grunning) // 統計以及重置清掃狀態相關程式碼 ... // 統計